You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/08 23:53:47 UTC
[spark] branch master updated: [SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e6c6d444ae07 [SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect
e6c6d444ae07 is described below
commit e6c6d444ae07f1ece127cea6332cce906b5aa1c5
Author: itholic <ha...@databricks.com>
AuthorDate: Fri Jun 9 08:53:30 2023 +0900
[SPARK-43700][SPARK-43701][CONNECT][PS] Enable `TimedeltaOps.(sub|rsub)` with Spark Connect
### What changes were proposed in this pull request?
This PR proposes to enable pandas-on-Spark `TimedeltaOps.sub` & `TimedeltaOps.rsub` for Spark Connect.
### Why are the changes needed?
To support pandas API with Spark Connect.
### Does this PR introduce _any_ user-facing change?
pandas-on-Spark API `TimedeltaOps.(sub|rsub)` are available on Spark Connect.
### How was this patch tested?
Reusing existing UTs.
Closes #41512 from itholic/SPARK-43700.
Authored-by: itholic <ha...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/data_type_ops/timedelta_ops.py | 9 ++-------
.../connect/data_type_ops/test_parity_timedelta_ops.py | 13 +++++--------
python/pyspark/sql/connect/column.py | 12 +++++++++++-
3 files changed, 18 insertions(+), 16 deletions(-)
diff --git a/python/pyspark/pandas/data_type_ops/timedelta_ops.py b/python/pyspark/pandas/data_type_ops/timedelta_ops.py
index 47f8eec360fd..3e96ebbb13ab 100644
--- a/python/pyspark/pandas/data_type_ops/timedelta_ops.py
+++ b/python/pyspark/pandas/data_type_ops/timedelta_ops.py
@@ -21,7 +21,6 @@ from typing import Any, Union
import pandas as pd
from pandas.api.types import CategoricalDtype
-from pyspark.sql.column import Column
from pyspark.sql.types import (
BooleanType,
DayTimeIntervalType,
@@ -66,8 +65,6 @@ class TimedeltaOps(DataTypeOps):
return col
def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
- from pyspark.pandas.base import column_op
-
_sanitize_list_like(right)
if (
@@ -75,17 +72,15 @@ class TimedeltaOps(DataTypeOps):
and isinstance(right.spark.data_type, DayTimeIntervalType)
or isinstance(right, timedelta)
):
- return column_op(Column.__sub__)(left, right)
+ return pyspark_column_op("__sub__")(left, right)
else:
raise TypeError("Timedelta subtraction can only be applied to timedelta series.")
def rsub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
- from pyspark.pandas.base import column_op
-
_sanitize_list_like(right)
if isinstance(right, timedelta):
- return column_op(Column.__rsub__)(left, right)
+ return pyspark_column_op("__rsub__")(left, right)
else:
raise TypeError("Timedelta subtraction can only be applied to timedelta series.")
diff --git a/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py b/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
index 77e3519bf9cd..058dd2bfd3f7 100644
--- a/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
+++ b/python/pyspark/pandas/tests/connect/data_type_ops/test_parity_timedelta_ops.py
@@ -16,6 +16,7 @@
#
import unittest
+import pyspark.pandas as ps
from pyspark.pandas.tests.data_type_ops.test_timedelta_ops import TimedeltaOpsTestsMixin
from pyspark.pandas.tests.connect.data_type_ops.testing_utils import OpsTestBase
from pyspark.testing.pandasutils import PandasOnSparkTestUtils
@@ -25,18 +26,14 @@ from pyspark.testing.connectutils import ReusedConnectTestCase
class TimedeltaOpsParityTests(
TimedeltaOpsTestsMixin, PandasOnSparkTestUtils, OpsTestBase, ReusedConnectTestCase
):
+ @property
+ def psdf(self):
+ return ps.from_pandas(self.pdf)
+
@unittest.skip("TODO(SPARK-43620): Support `Column` for SparkConnectColumn.__getitem__.")
def test_astype(self):
super().test_astype()
- @unittest.skip("TODO(SPARK-43700): Fix TimedeltaOps.rsub to work with Spark Connect.")
- def test_rsub(self):
- super().test_rsub()
-
- @unittest.skip("TODO(SPARK-43701): Fix TimedeltaOps.sub to work with Spark Connect.")
- def test_sub(self):
- super().test_sub()
-
if __name__ == "__main__":
from pyspark.pandas.tests.connect.data_type_ops.test_parity_timedelta_ops import * # noqa: F401
diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py
index 2fdc3aabd4a2..4d32da56192f 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -73,7 +73,17 @@ def _bin_op(
) -> Callable[["Column", Any], "Column"]:
def wrapped(self: "Column", other: Any) -> "Column":
if other is None or isinstance(
- other, (bool, float, int, str, datetime.datetime, datetime.date, decimal.Decimal)
+ other,
+ (
+ bool,
+ float,
+ int,
+ str,
+ datetime.datetime,
+ datetime.date,
+ decimal.Decimal,
+ datetime.timedelta,
+ ),
):
other_expr = LiteralExpression._from_value(other)
else:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org