You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/08 23:53:47 UTC

[spark] branch master updated: [SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6c6d444ae07 [SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect
e6c6d444ae07 is described below

commit e6c6d444ae07f1ece127cea6332cce906b5aa1c5
Author: itholic <ha...@databricks.com>
AuthorDate: Fri Jun 9 08:53:30 2023 +0900

    [SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to enable pandas-on-Spark `TimedeltaOps.sub` & `TimedeltaOps.rsub` for Spark Connect.
    
    ### Why are the changes needed?
    
    To support pandas API with Spark Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    
    pandas-on-Spark API `TimedeltaOps.(sub|rsub)` are available on Spark Connect.
    
    ### How was this patch tested?
    
    Reusing existing UTs.
    
    Closes #41512 from itholic/SPARK-43700.
    
    Authored-by: itholic <ha...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/data_type_ops/timedelta_ops.py        |  9 ++-------
 .../connect/data_type_ops/test_parity_timedelta_ops.py      | 13 +++++--------
 python/pyspark/sql/connect/column.py                        | 12 +++++++++++-
 3 files changed, 18 insertions(+), 16 deletions(-)

diff --git a/python/pyspark/pandas/data_type_ops/timedelta_ops.py b/python/pyspark/pandas/data_type_ops/timedelta_ops.py
index 47f8eec360fd..3e96ebbb13ab 100644
--- a/python/pyspark/pandas/data_type_ops/timedelta_ops.py
+++ b/python/pyspark/pandas/data_type_ops/timedelta_ops.py
@@ -21,7 +21,6 @@ from typing import Any, Union
 import pandas as pd
 from pandas.api.types import CategoricalDtype
 
-from pyspark.sql.column import Column
 from pyspark.sql.types import (
     BooleanType,
     DayTimeIntervalType,
@@ -66,8 +65,6 @@ class TimedeltaOps(DataTypeOps):
         return col
 
     def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
-        from pyspark.pandas.base import column_op
-
         _sanitize_list_like(right)
 
         if (
@@ -75,17 +72,15 @@ class TimedeltaOps(DataTypeOps):
             and isinstance(right.spark.data_type, DayTimeIntervalType)
             or isinstance(right, timedelta)
         ):
-            return column_op(Column.__sub__)(left, right)
+            return pyspark_column_op("__sub__")(left, right)
         else:
             raise TypeError("Timedelta subtraction can only be applied to timedelta series.")
 
     def rsub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
-        from pyspark.pandas.base import column_op
-
         _sanitize_list_like(right)
 
         if isinstance(right, timedelta):
-            return column_op(Column.__rsub__)(left, right)
+            return pyspark_column_op("__rsub__")(left, right)
         else:
             raise TypeError("Timedelta subtraction can only be applied to timedelta series.")
 
diff --git a/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py b/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
index 77e3519bf9cd..058dd2bfd3f7 100644
--- a/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
+++ b/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
@@ -16,6 +16,7 @@
 #
 import unittest
 
+import pyspark.pandas as ps
 from pyspark.pandas.tests.data_type_ops.test_timedelta_ops import TimedeltaOpsTestsMixin
 from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase
 from pyspark.testing.pandasutils import PandasOnSparkTestUtils
@@ -25,18 +26,14 @@ from pyspark.testing.connectutils import ReusedConnectTestCase
 class TimedeltaOpsParityTests(
     TimedeltaOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase
 ):
+    @property
+    def psdf(self):
+        return ps.from_pandas(self.pdf)
+
     @unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.")
     def test_astype(self):
         super().test_astype()
 
-    @unittest.skip("TODO(SPARK-43700): Fix TimedeltaOps.rsub to work with Spark Connect.")
-    def test_rsub(self):
-        super().test_rsub()
-
-    @unittest.skip("TODO(SPARK-43701): Fix TimedeltaOps.sub to work with Spark Connect.")
-    def test_sub(self):
-        super().test_sub()
-
 
 if __name__ == "__main__":
     from pyspark.pandas.tests.connect.data_type_ops.test_parity_timedelta_ops import *  # noqa: F401
diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py
index 2fdc3aabd4a2..4d32da56192f 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -73,7 +73,17 @@ def _bin_op(
 ) -> Callable[["Column", Any], "Column"]:
     def wrapped(self: "Column", other: Any) -> "Column":
         if other is None or isinstance(
-            other, (bool, float, int, str, datetime.datetime, datetime.date, decimal.Decimal)
+            other,
+            (
+                bool,
+                float,
+                int,
+                str,
+                datetime.datetime,
+                datetime.date,
+                decimal.Decimal,
+                datetime.timedelta,
+            ),
         ):
             other_expr = LiteralExpression._from_value(other)
         else:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org