You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2021/07/07 20:48:01 UTC
[spark] branch branch-3.2 updated: [SPARK-35615][PYTHON] Make unary
and comparison operators data-type-based
This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 61bfdf0 [SPARK-35615][PYTHON] Make unary and comparison operators data-type-based
61bfdf0 is described below
commit 61bfdf0c037ce54e4344bc95f1a64b21f4c607e3
Author: Xinrong Meng <xi...@databricks.com>
AuthorDate: Wed Jul 7 13:46:50 2021 -0700
[SPARK-35615][PYTHON] Make unary and comparison operators data-type-based
### What changes were proposed in this pull request?
Make unary and comparison operators data-type-based. Refactored operators include:
- Unary operators: `__neg__`, `__abs__`, `__invert__`,
- Comparison operators: `>`, `>=`, `<`, `<=`, `==`, `!=`
Non-goal: Tasks below are inspired during the development of this PR.
[[SPARK-35997] Implement comparison operators for CategoricalDtype in pandas API on Spark](https://issues.apache.org/jira/browse/SPARK-35997)
[[SPARK-36000] Support creating a ps.Series/Index with `Decimal('NaN')` with Arrow disabled](https://issues.apache.org/jira/browse/SPARK-36000)
[[SPARK-36001] Assume result's index to be disordered in tests with operations on different Series](https://issues.apache.org/jira/browse/SPARK-36001)
[[SPARK-36002] Consolidate tests for data-type-based operations of decimal Series](https://issues.apache.org/jira/browse/SPARK-36002)
[[SPARK-36003] Implement unary operator `invert` of numeric ps.Series/Index](https://issues.apache.org/jira/browse/SPARK-36003)
### Why are the changes needed?
We have been refactoring basic operators to be data-type-based for readability, flexibility, and extensibility.
Unary and comparison operators are still not data-type-based yet. We should fill the gaps.
### Does this PR introduce _any_ user-facing change?
Yes.
- Better error messages. For example,
Before:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([b"2", b"3", b"4"])
>>> -psser
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: cannot resolve '(- `0`)' due to data type mismatch: ...
```
After:
```py
>>> import pyspark.pandas as ps
>>> psser = ps.Series([b"2", b"3", b"4"])
>>> -psser
Traceback (most recent call last):
...
TypeError: Unary - can not be applied to binaries.
>>>
```
- Support unary `-` of `bool` Series. For example,
Before:
```py
>>> psser = ps.Series([True, False, True])
>>> -psser
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: cannot resolve '(- `0`)' due to data type mismatch: ...
```
After:
```py
>>> psser = ps.Series([True, False, True])
>>> -psser
0 False
1 True
2 False
dtype: bool
```
### How was this patch tested?
Unit tests.
Closes #33162 from xinrong-databricks/datatypeops_refactor.
Authored-by: Xinrong Meng <xi...@databricks.com>
Signed-off-by: Takuya UESHIN <ue...@databricks.com>
(cherry picked from commit 6e4e04f2a13e8983521c230bed2b2b4632977ad1)
Signed-off-by: Takuya UESHIN <ue...@databricks.com>
---
python/pyspark/pandas/base.py | 25 ++--
python/pyspark/pandas/data_type_ops/base.py | 33 ++++-
python/pyspark/pandas/data_type_ops/binary_ops.py | 22 +++-
python/pyspark/pandas/data_type_ops/boolean_ops.py | 48 ++++++-
.../pandas/data_type_ops/categorical_ops.py | 17 ++-
python/pyspark/pandas/data_type_ops/complex_ops.py | 42 ++++++-
python/pyspark/pandas/data_type_ops/date_ops.py | 22 +++-
.../pyspark/pandas/data_type_ops/datetime_ops.py | 22 +++-
python/pyspark/pandas/data_type_ops/null_ops.py | 24 +++-
python/pyspark/pandas/data_type_ops/num_ops.py | 51 +++++++-
python/pyspark/pandas/data_type_ops/string_ops.py | 21 ++++
.../pandas/tests/data_type_ops/test_binary_ops.py | 59 +++++++++
.../pandas/tests/data_type_ops/test_boolean_ops.py | 110 ++++++++++++++++
.../tests/data_type_ops/test_categorical_ops.py | 43 +++++++
.../pandas/tests/data_type_ops/test_complex_ops.py | 91 ++++++++++++++
.../pandas/tests/data_type_ops/test_date_ops.py | 61 +++++++++
.../tests/data_type_ops/test_datetime_ops.py | 59 +++++++++
.../pandas/tests/data_type_ops/test_decimal_ops.py | 54 +++++++-
.../pandas/tests/data_type_ops/test_null_ops.py | 33 +++++
.../pandas/tests/data_type_ops/test_num_ops.py | 140 +++++++++++++++++++++
.../pandas/tests/data_type_ops/test_string_ops.py | 110 ++++++++++++++++
.../pandas/tests/data_type_ops/test_udt_ops.py | 29 +++++
22 files changed, 1095 insertions(+), 21 deletions(-)
diff --git a/python/pyspark/pandas/base.py b/python/pyspark/pandas/base.py
index 6e76311..ff17fdf 100644
--- a/python/pyspark/pandas/base.py
+++ b/python/pyspark/pandas/base.py
@@ -317,7 +317,7 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
# arithmetic operators
def __neg__(self: IndexOpsLike) -> IndexOpsLike:
- return cast(IndexOpsLike, column_op(Column.__neg__)(self))
+ return self._dtype_op.neg(self)
def __add__(self, other: Any) -> SeriesOrIndex:
return self._dtype_op.add(self, other)
@@ -394,22 +394,29 @@ class IndexOpsMixin(object, metaclass=ABCMeta):
return self._dtype_op.rpow(self, other)
def __abs__(self: IndexOpsLike) -> IndexOpsLike:
- return cast(IndexOpsLike, column_op(F.abs)(self))
+ return self._dtype_op.abs(self)
# comparison operators
def __eq__(self, other: Any) -> SeriesOrIndex: # type: ignore[override]
- return column_op(Column.__eq__)(self, other)
+ return self._dtype_op.eq(self, other)
def __ne__(self, other: Any) -> SeriesOrIndex: # type: ignore[override]
- return column_op(Column.__ne__)(self, other)
+ return self._dtype_op.ne(self, other)
- __lt__ = column_op(Column.__lt__)
- __le__ = column_op(Column.__le__)
- __ge__ = column_op(Column.__ge__)
- __gt__ = column_op(Column.__gt__)
+ def __lt__(self, other: Any) -> SeriesOrIndex:
+ return self._dtype_op.lt(self, other)
+
+ def __le__(self, other: Any) -> SeriesOrIndex:
+ return self._dtype_op.le(self, other)
+
+ def __ge__(self, other: Any) -> SeriesOrIndex:
+ return self._dtype_op.ge(self, other)
+
+ def __gt__(self, other: Any) -> SeriesOrIndex:
+ return self._dtype_op.gt(self, other)
def __invert__(self: IndexOpsLike) -> IndexOpsLike:
- return cast(IndexOpsLike, column_op(Column.__invert__)(self))
+ return self._dtype_op.invert(self)
# `and`, `or`, `not` cannot be overloaded in Python,
# so use bitwise operators as boolean operators
diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py
index a345668..71aa943 100644
--- a/python/pyspark/pandas/data_type_ops/base.py
+++ b/python/pyspark/pandas/data_type_ops/base.py
@@ -24,7 +24,7 @@ import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
-from pyspark.sql import functions as F
+from pyspark.sql import functions as F, Column
from pyspark.sql.types import (
ArrayType,
BinaryType,
@@ -318,6 +318,37 @@ class DataTypeOps(object, metaclass=ABCMeta):
def ror(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
return left.__or__(right)
+ def neg(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("Unary - can not be applied to %s." % self.pretty_name)
+
+ def abs(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("abs() can not be applied to %s." % self.pretty_name)
+
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("< can not be applied to %s." % self.pretty_name)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("<= can not be applied to %s." % self.pretty_name)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("> can not be applied to %s." % self.pretty_name)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError(">= can not be applied to %s." % self.pretty_name)
+
+ def eq(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__eq__)(left, right)
+
+ def ne(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ne__)(left, right)
+
+ def invert(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("Unary ~ can not be applied to %s." % self.pretty_name)
+
def restore(self, col: pd.Series) -> pd.Series:
"""Restore column when to_pandas."""
return col
diff --git a/python/pyspark/pandas/data_type_ops/binary_ops.py b/python/pyspark/pandas/data_type_ops/binary_ops.py
index 7a80d1d..929bed4 100644
--- a/python/pyspark/pandas/data_type_ops/binary_ops.py
+++ b/python/pyspark/pandas/data_type_ops/binary_ops.py
@@ -30,7 +30,7 @@ from pyspark.pandas.data_type_ops.base import (
)
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.typedef import pandas_on_spark_type
-from pyspark.sql import functions as F
+from pyspark.sql import functions as F, Column
from pyspark.sql.types import BinaryType, BooleanType, StringType
@@ -63,6 +63,26 @@ class BinaryOps(DataTypeOps):
"Concatenation can not be applied to %s and the given type." % self.pretty_name
)
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py b/python/pyspark/pandas/data_type_ops/boolean_ops.py
index ecc0141..a9f9239 100644
--- a/python/pyspark/pandas/data_type_ops/boolean_ops.py
+++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py
@@ -16,7 +16,7 @@
#
import numbers
-from typing import Any, Union
+from typing import cast, Any, Union
import pandas as pd
from pandas.api.types import CategoricalDtype
@@ -47,7 +47,7 @@ class BooleanOps(DataTypeOps):
@property
def pretty_name(self) -> str:
- return "booleans"
+ return "bools"
def add(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
@@ -272,6 +272,37 @@ class BooleanOps(DataTypeOps):
else:
return _as_other_type(index_ops, dtype, spark_type)
+ def neg(self, operand: IndexOpsLike) -> IndexOpsLike:
+ return ~operand
+
+ def abs(self, operand: IndexOpsLike) -> IndexOpsLike:
+ return operand
+
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
+ def invert(self, operand: IndexOpsLike) -> IndexOpsLike:
+ from pyspark.pandas.base import column_op
+
+ return cast(IndexOpsLike, column_op(Column.__invert__)(operand))
+
class BooleanExtensionOps(BooleanOps):
"""
@@ -279,6 +310,10 @@ class BooleanExtensionOps(BooleanOps):
and dtype BooleanDtype.
"""
+ @property
+ def pretty_name(self) -> str:
+ return "booleans"
+
def __and__(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
def and_func(left: Column, right: Any) -> Column:
if not isinstance(right, Column):
@@ -304,3 +339,12 @@ class BooleanExtensionOps(BooleanOps):
def restore(self, col: pd.Series) -> pd.Series:
"""Restore column when to_pandas."""
return col.astype(self.dtype)
+
+ def neg(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("Unary - can not be applied to %s." % self.pretty_name)
+
+ def invert(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("Unary ~ can not be applied to %s." % self.pretty_name)
+
+ def abs(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("abs() can not be applied to %s." % self.pretty_name)
diff --git a/python/pyspark/pandas/data_type_ops/categorical_ops.py b/python/pyspark/pandas/data_type_ops/categorical_ops.py
index bda98be..8c2a27b 100644
--- a/python/pyspark/pandas/data_type_ops/categorical_ops.py
+++ b/python/pyspark/pandas/data_type_ops/categorical_ops.py
@@ -16,12 +16,12 @@
#
from itertools import chain
-from typing import Union
+from typing import Any, Union
import pandas as pd
from pandas.api.types import CategoricalDtype
-from pyspark.pandas._typing import Dtype, IndexOpsLike
+from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
from pyspark.pandas.data_type_ops.base import DataTypeOps
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.typedef import pandas_on_spark_type
@@ -65,3 +65,16 @@ class CategoricalOps(DataTypeOps):
return index_ops._with_new_scol(
scol.alias(index_ops._internal.data_spark_column_names[0])
).astype(dtype)
+
+ # TODO(SPARK-35997): Implement comparison operators below
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise NotImplementedError("< can not be applied to %s." % self.pretty_name)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise NotImplementedError("<= can not be applied to %s." % self.pretty_name)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise NotImplementedError("> can not be applied to %s." % self.pretty_name)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise NotImplementedError(">= can not be applied to %s." % self.pretty_name)
diff --git a/python/pyspark/pandas/data_type_ops/complex_ops.py b/python/pyspark/pandas/data_type_ops/complex_ops.py
index df7b570..41f79d4 100644
--- a/python/pyspark/pandas/data_type_ops/complex_ops.py
+++ b/python/pyspark/pandas/data_type_ops/complex_ops.py
@@ -29,7 +29,7 @@ from pyspark.pandas.data_type_ops.base import (
_as_string_type,
)
from pyspark.pandas.typedef import pandas_on_spark_type
-from pyspark.sql import functions as F
+from pyspark.sql import functions as F, Column
from pyspark.sql.types import ArrayType, BooleanType, NumericType, StringType
@@ -62,6 +62,26 @@ class ArrayOps(DataTypeOps):
return column_op(F.concat)(left, right)
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
@@ -93,3 +113,23 @@ class StructOps(DataTypeOps):
@property
def pretty_name(self) -> str:
return "structs"
+
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
diff --git a/python/pyspark/pandas/data_type_ops/date_ops.py b/python/pyspark/pandas/data_type_ops/date_ops.py
index 841c4e1..9cdd0e5 100644
--- a/python/pyspark/pandas/data_type_ops/date_ops.py
+++ b/python/pyspark/pandas/data_type_ops/date_ops.py
@@ -22,7 +22,7 @@ from typing import Any, Union
import pandas as pd
from pandas.api.types import CategoricalDtype
-from pyspark.sql import functions as F
+from pyspark.sql import functions as F, Column
from pyspark.sql.types import BooleanType, DateType, StringType
from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
@@ -78,6 +78,26 @@ class DateOps(DataTypeOps):
else:
raise TypeError("date subtraction can only be applied to date series.")
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index 626c25f..3b7220a 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -22,7 +22,7 @@ from typing import Any, Union, cast
import pandas as pd
from pandas.api.types import CategoricalDtype
-from pyspark.sql import functions as F
+from pyspark.sql import functions as F, Column
from pyspark.sql.types import BooleanType, StringType, TimestampType
from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
@@ -88,6 +88,26 @@ class DatetimeOps(DataTypeOps):
else:
raise TypeError("datetime subtraction can only be applied to datetime series.")
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def prepare(self, col: pd.Series) -> pd.Series:
"""Prepare column when from_pandas."""
return col
diff --git a/python/pyspark/pandas/data_type_ops/null_ops.py b/python/pyspark/pandas/data_type_ops/null_ops.py
index 694fdc9..f26de6f 100644
--- a/python/pyspark/pandas/data_type_ops/null_ops.py
+++ b/python/pyspark/pandas/data_type_ops/null_ops.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from typing import Union
+from typing import Any, Union
from pandas.api.types import CategoricalDtype
@@ -27,7 +27,9 @@ from pyspark.pandas.data_type_ops.base import (
_as_other_type,
_as_string_type,
)
+from pyspark.pandas._typing import SeriesOrIndex
from pyspark.pandas.typedef import pandas_on_spark_type
+from pyspark.sql import Column
from pyspark.sql.types import BooleanType, StringType
@@ -40,6 +42,26 @@ class NullOps(DataTypeOps):
def pretty_name(self) -> str:
return "nulls"
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 012b8dc..0edf451 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -16,7 +16,7 @@
#
import numbers
-from typing import Any, Union
+from typing import cast, Any, Union
import numpy as np
import pandas as pd
@@ -158,6 +158,40 @@ class NumericOps(DataTypeOps):
right = transform_boolean_operand_to_numeric(right)
return column_op(rmod)(left, right)
+ # TODO(SPARK-36003): Implement unary operator `invert` as below
+ def invert(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise NotImplementedError("Unary ~ can not be applied to %s." % self.pretty_name)
+
+ def neg(self, operand: IndexOpsLike) -> IndexOpsLike:
+ from pyspark.pandas.base import column_op
+
+ return cast(IndexOpsLike, column_op(Column.__neg__)(operand))
+
+ def abs(self, operand: IndexOpsLike) -> IndexOpsLike:
+ from pyspark.pandas.base import column_op
+
+ return cast(IndexOpsLike, column_op(F.abs)(operand))
+
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
class IntegralOps(NumericOps):
"""
@@ -406,6 +440,21 @@ class DecimalOps(FractionalOps):
def pretty_name(self) -> str:
return "decimal"
+ def invert(self, operand: IndexOpsLike) -> IndexOpsLike:
+ raise TypeError("Unary ~ can not be applied to %s." % self.pretty_name)
+
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("< can not be applied to %s." % self.pretty_name)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("<= can not be applied to %s." % self.pretty_name)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError("> can not be applied to %s." % self.pretty_name)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ raise TypeError(">= can not be applied to %s." % self.pretty_name)
+
def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike:
return index_ops._with_new_scol(
index_ops.spark.column.isNull(),
diff --git a/python/pyspark/pandas/data_type_ops/string_ops.py b/python/pyspark/pandas/data_type_ops/string_ops.py
index 6acdff3..4729582 100644
--- a/python/pyspark/pandas/data_type_ops/string_ops.py
+++ b/python/pyspark/pandas/data_type_ops/string_ops.py
@@ -34,6 +34,7 @@ from pyspark.pandas.data_type_ops.base import (
from pyspark.pandas.internal import InternalField
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
+from pyspark.sql import Column
from pyspark.sql.types import BooleanType
@@ -112,6 +113,26 @@ class StringOps(DataTypeOps):
def rmod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
raise TypeError("modulo can not be applied on string series or literals.")
+ def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__lt__)(left, right)
+
+ def le(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__le__)(left, right)
+
+ def ge(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__ge__)(left, right)
+
+ def gt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
+ from pyspark.pandas.base import column_op
+
+ return column_op(Column.__gt__)(left, right)
+
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
dtype, spark_type = pandas_on_spark_type(dtype)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
index e7cd668..9a52a6f 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_binary_ops.py
@@ -33,6 +33,14 @@ class BinaryOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def psser(self):
return ps.from_pandas(self.pser)
+ @property
+ def other_pser(self):
+ return pd.Series([b"2", b"3", b"4"])
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
def test_add(self):
psser = self.psser
pser = self.pser
@@ -159,6 +167,57 @@ class BinaryOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=[b"2", b"3", b"1"])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
index e25e7f1..d714d7e 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
@@ -48,6 +48,14 @@ class BooleanOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def float_psser(self):
return ps.from_pandas(self.float_pser)
+ @property
+ def other_pser(self):
+ return pd.Series([False, False, True])
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
def test_add(self):
pser = self.pser
psser = self.psser
@@ -306,6 +314,57 @@ class BooleanOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=[False, True])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assert_eq(-self.pser, -self.psser)
+
+ def test_abs(self):
+ self.assert_eq(abs(self.pser), abs(self.psser))
+
+ def test_invert(self):
+ self.assert_eq(~self.pser, ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
@unittest.skipIf(
not extension_object_dtypes_available, "pandas extension object dtypes are not available"
@@ -586,6 +645,57 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
else:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
from pyspark.pandas.tests.data_type_ops.test_boolean_ops import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
index 53db89d..c0fb240 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_categorical_ops.py
@@ -36,6 +36,14 @@ class CategoricalOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def psser(self):
return ps.from_pandas(self.pser)
+ @property
+ def other_pser(self):
+ return pd.Series(["y", "x", 1], dtype="category")
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
def test_add(self):
self.assertRaises(TypeError, lambda: self.psser + "x")
self.assertRaises(TypeError, lambda: self.psser + 1)
@@ -166,6 +174,41 @@ class CategoricalOpsTest(PandasOnSparkTestCase, TestCasesUtils):
else:
self.assert_eq(pd.Series(data).astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ self.assertRaises(NotImplementedError, lambda: self.psser < self.other_psser)
+
+ def test_le(self):
+ self.assertRaises(NotImplementedError, lambda: self.psser <= self.other_psser)
+
+ def test_gt(self):
+ self.assertRaises(NotImplementedError, lambda: self.psser > self.other_psser)
+
+ def test_ge(self):
+ self.assertRaises(NotImplementedError, lambda: self.psser >= self.other_psser)
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
index f13f957..c62ec3f 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_complex_ops.py
@@ -73,6 +73,22 @@ class ComplexOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def psser(self):
return ps.from_pandas(self.pser)
+ @property
+ def other_pser(self):
+ return pd.Series([[2, 3, 4]])
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
+ @property
+ def struct_pser(self):
+ return pd.Series([("x", 1)])
+
+ @property
+ def struct_psser(self):
+ return ps.Index([("x", 1)]).to_series().reset_index(drop=True)
+
def test_add(self):
for pser, psser in zip(self.psers, self.pssers):
self.assert_eq(pser + pser, psser + psser)
@@ -224,6 +240,81 @@ class ComplexOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_astype(self):
self.assert_eq(self.pser.astype(str), self.psser.astype(str))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser == self.struct_pser,
+ (self.struct_psser == self.struct_psser).sort_index(),
+ )
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser != self.struct_pser,
+ (self.struct_psser != self.struct_psser).sort_index(),
+ )
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser < self.struct_pser,
+ (self.struct_psser < self.struct_psser).sort_index(),
+ )
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser <= self.struct_pser,
+ (self.struct_psser <= self.struct_psser).sort_index(),
+ )
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser > self.struct_pser,
+ (self.struct_psser > self.struct_psser).sort_index(),
+ )
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+ self.assert_eq(
+ self.struct_pser >= self.struct_pser,
+ (self.struct_psser >= self.struct_psser).sort_index(),
+ )
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
index 1b5c3ce..edfe806 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
@@ -40,6 +40,16 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
return ps.from_pandas(self.pser)
@property
+ def other_pser(self):
+ return pd.Series(
+ [datetime.date(2000, 1, 31), datetime.date(1994, 3, 1), datetime.date(1990, 2, 2)]
+ )
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
+ @property
def some_date(self):
return datetime.date(1994, 1, 1)
@@ -184,6 +194,57 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=["a", "b", "c"])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
index c0db234..8b2a0f9 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_datetime_ops.py
@@ -37,6 +37,14 @@ class DatetimeOpsTest(PandasOnSparkTestCase, TestCasesUtils):
return ps.from_pandas(self.pser)
@property
+ def other_pser(self):
+ return pd.Series(pd.date_range("1994-4-30 10:30:15", periods=3, freq="M"))
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
+ @property
def some_datetime(self):
return datetime.datetime(1994, 1, 31, 10, 30, 00)
@@ -184,6 +192,57 @@ class DatetimeOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=["a", "b", "c"])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py
index 7aade73..15bbb69 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_decimal_ops.py
@@ -21,8 +21,9 @@ import unittest
import numpy as np
import pandas as pd
-from pyspark.pandas.data_type_ops.num_ops import DecimalOps
from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.pandas.data_type_ops.num_ops import DecimalOps
from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
from pyspark.testing.pandasutils import PandasOnSparkTestCase
@@ -37,6 +38,14 @@ class DecimalOpsTest(PandasOnSparkTestCase, TestCasesUtils):
return ps.from_pandas(self.decimal_pser)
@property
+ def other_decimal_pser(self):
+ return pd.Series([d.Decimal(2.0), d.Decimal(1.0), d.Decimal(-3.0)])
+
+ @property
+ def other_decimal_psser(self):
+ return ps.from_pandas(self.other_decimal_pser)
+
+ @property
def float_pser(self):
return pd.Series([1, 2, np.nan], dtype=float)
@@ -54,6 +63,49 @@ class DecimalOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.assert_eq(self.decimal_pser.isnull(), self.decimal_psser.isnull())
self.assert_eq(self.float_pser.isnull(), self.float_psser.isnull())
+ def test_neg(self):
+ self.assert_eq(-self.other_decimal_pser, -self.other_decimal_psser)
+
+ def test_abs(self):
+ self.assert_eq(abs(self.other_decimal_pser), abs(self.other_decimal_psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.decimal_psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.decimal_pser == self.other_decimal_pser,
+ (self.decimal_psser == self.other_decimal_psser).sort_index(),
+ )
+ self.assert_eq(
+ self.decimal_pser == self.decimal_pser,
+ (self.decimal_psser == self.decimal_psser).sort_index(),
+ )
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.decimal_pser != self.other_decimal_pser,
+ (self.decimal_psser != self.other_decimal_psser).sort_index(),
+ )
+ self.assert_eq(
+ self.decimal_pser != self.decimal_pser,
+ (self.decimal_psser != self.decimal_psser).sort_index(),
+ )
+
+ def test_lt(self):
+ self.assertRaises(TypeError, lambda: self.decimal_psser < self.other_decimal_psser)
+
+ def test_le(self):
+ self.assertRaises(TypeError, lambda: self.decimal_psser <= self.other_decimal_psser)
+
+ def test_gt(self):
+ self.assertRaises(TypeError, lambda: self.decimal_psser > self.other_decimal_psser)
+
+ def test_ge(self):
+ self.assertRaises(TypeError, lambda: self.decimal_psser >= self.other_decimal_psser)
+
if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_null_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_null_ops.py
index 8ab9793..a7f0b6c 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_null_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_null_ops.py
@@ -135,6 +135,39 @@ class NullOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=[1, 2, 3])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
import unittest
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
index d607e22..80f412f 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
@@ -30,6 +30,7 @@ from pyspark.pandas.typedef.typehints import (
extension_dtypes_available,
extension_float_dtypes_available,
)
+from pyspark.sql.types import DecimalType
from pyspark.testing.pandasutils import PandasOnSparkTestCase
@@ -317,6 +318,57 @@ class NumOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=[2, 1, 3])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(-pser, -psser)
+
+ def test_abs(self):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(abs(pser), abs(psser))
+
+ def test_invert(self):
+ for psser in self.numeric_pssers:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assertRaises(NotImplementedError, lambda: ~psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser == pser, (psser == psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser != pser, (psser != psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser < pser, (psser < psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser <= pser, (psser <= psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser > pser, (psser > psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.numeric_pser_psser_pairs:
+ if not isinstance(psser.spark.data_type, DecimalType):
+ self.assert_eq(pser >= pser, (psser >= psser).sort_index())
+
@unittest.skipIf(not extension_dtypes_available, "pandas extension dtypes are not available")
class IntegralExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
@@ -346,6 +398,48 @@ class IntegralExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
for dtype in self.extension_dtypes:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ def test_neg(self):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(-pser, -psser)
+
+ def test_abs(self):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(abs(pser), abs(psser))
+
+ def test_invert(self):
+ for psser in self.intergral_extension_pssers:
+ self.assertRaises(NotImplementedError, lambda: ~psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser == pser, (psser == psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser != pser, (psser != psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser < pser, (psser < psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser <= pser, (psser <= psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser > pser, (psser > psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.intergral_extension_pser_psser_pairs:
+ self.check_extension(pser >= pser, (psser >= psser).sort_index())
+
@unittest.skipIf(
not extension_float_dtypes_available, "pandas extension float dtypes are not available"
@@ -380,6 +474,52 @@ class FractionalExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
for dtype in self.extension_dtypes:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ def test_neg(self):
+ # pandas raises "TypeError: bad operand type for unary -: 'FloatingArray'"
+ for dtype in self.fractional_extension_dtypes:
+ self.assert_eq(
+ ps.Series([-0.1, -0.2, -0.3, None], dtype=dtype),
+ -ps.Series([0.1, 0.2, 0.3, None], dtype=dtype),
+ )
+
+ def test_abs(self):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(abs(pser), abs(psser))
+
+ def test_invert(self):
+ for psser in self.fractional_extension_pssers:
+ self.assertRaises(NotImplementedError, lambda: ~psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser == pser, (psser == psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser != pser, (psser != psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser < pser, (psser < psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser <= pser, (psser <= psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser > pser, (psser > psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ for pser, psser in self.fractional_extension_pser_psser_pairs:
+ self.check_extension(pser >= pser, (psser >= psser).sort_index())
+
if __name__ == "__main__":
from pyspark.pandas.tests.data_type_ops.test_num_ops import * # noqa: F401
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
index 040065f..973d0bd 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_string_ops.py
@@ -40,6 +40,14 @@ class StringOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def psser(self):
return ps.from_pandas(self.pser)
+ @property
+ def other_pser(self):
+ return pd.Series(["z", "y", "x"])
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
def test_add(self):
self.assert_eq(self.pser + "x", self.psser + "x")
self.assertRaises(TypeError, lambda: self.psser + 1)
@@ -179,6 +187,57 @@ class StringOpsTest(PandasOnSparkTestCase, TestCasesUtils):
cat_type = CategoricalDtype(categories=["3", "1", "2"])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.assert_eq(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
@unittest.skipIf(
not extension_object_dtypes_available, "pandas extension object dtypes are not available"
@@ -192,6 +251,14 @@ class StringExtensionOpsTest(StringOpsTest, PandasOnSparkTestCase, TestCasesUtil
def psser(self):
return ps.from_pandas(self.pser)
+ @property
+ def other_pser(self):
+ return pd.Series([None, "z", "y", "x"], dtype="string")
+
+ @property
+ def other_psser(self):
+ return ps.from_pandas(self.other_pser)
+
def test_radd(self):
self.assert_eq("x" + self.pser, ("x" + self.psser).astype("string"))
self.assertRaises(TypeError, lambda: 1 + self.psser)
@@ -234,8 +301,51 @@ class StringExtensionOpsTest(StringOpsTest, PandasOnSparkTestCase, TestCasesUtil
if dtype in ["string", StringDtype()]:
self.check_extension(pser.astype(dtype), psser.astype(dtype))
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser == self.other_pser, (self.psser == self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser != self.other_pser, (self.psser != self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser < self.other_pser, (self.psser < self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser < self.pser, (self.psser < self.psser).sort_index())
+
+ def test_le(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser <= self.other_pser, (self.psser <= self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser <= self.pser, (self.psser <= self.psser).sort_index())
+
+ def test_gt(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser > self.other_pser, (self.psser > self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser > self.pser, (self.psser > self.psser).sort_index())
+
+ def test_ge(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.check_extension(
+ self.pser >= self.other_pser, (self.psser >= self.other_psser).sort_index()
+ )
+ self.check_extension(self.pser >= self.pser, (self.psser >= self.psser).sort_index())
+
if __name__ == "__main__":
+
from pyspark.pandas.tests.data_type_ops.test_string_ops import * # noqa: F401
try:
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_udt_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_udt_ops.py
index 156b8d3..d30cdd4 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_udt_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_udt_ops.py
@@ -131,6 +131,35 @@ class UDTOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_astype(self):
self.assertRaises(TypeError, lambda: self.psser.astype(str))
+ def test_neg(self):
+ self.assertRaises(TypeError, lambda: -self.psser)
+
+ def test_abs(self):
+ self.assertRaises(TypeError, lambda: abs(self.psser))
+
+ def test_invert(self):
+ self.assertRaises(TypeError, lambda: ~self.psser)
+
+ def test_eq(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser == self.pser, (self.psser == self.psser).sort_index())
+
+ def test_ne(self):
+ with option_context("compute.ops_on_diff_frames", True):
+ self.assert_eq(self.pser != self.pser, (self.psser != self.psser).sort_index())
+
+ def test_lt(self):
+ self.assertRaises(TypeError, lambda: self.psser < self.psser)
+
+ def test_le(self):
+ self.assertRaises(TypeError, lambda: self.psser <= self.psser)
+
+ def test_gt(self):
+ self.assertRaises(TypeError, lambda: self.psser > self.psser)
+
+ def test_ge(self):
+ self.assertRaises(TypeError, lambda: self.psser >= self.psser)
+
if __name__ == "__main__":
import unittest
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org