You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/12 02:55:40 UTC
[spark] branch master updated: [SPARK-36064][PYTHON] Manage
InternalField more in DataTypeOps
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 95e6c6e [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps
95e6c6e is described below
commit 95e6c6e3e950b95b2b303d2fd5d859f752e2ca5e
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Mon Jul 12 11:55:05 2021 +0900
[SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps
### What changes were proposed in this pull request?
Properly set `InternalField` more in `DataTypeOps`.
### Why are the changes needed?
There are more places in `DataTypeOps` where we can manage `InternalField`.
We should manage `InternalField` for these cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #33275 from ueshin/issues/SPARK-36064/fields.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/data_type_ops/base.py | 39 +++----
python/pyspark/pandas/data_type_ops/boolean_ops.py | 64 +++++------
.../pandas/data_type_ops/categorical_ops.py | 6 +-
python/pyspark/pandas/data_type_ops/date_ops.py | 9 +-
.../pyspark/pandas/data_type_ops/datetime_ops.py | 22 ++--
python/pyspark/pandas/data_type_ops/num_ops.py | 43 +++-----
python/pyspark/pandas/data_type_ops/string_ops.py | 38 +++++--
.../pandas/tests/data_type_ops/test_boolean_ops.py | 118 +++++++++++++++------
.../pandas/tests/data_type_ops/test_date_ops.py | 2 +-
.../pandas/tests/data_type_ops/testing_utils.py | 9 +-
10 files changed, 212 insertions(+), 138 deletions(-)
diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py
index 71aa943..c79408b 100644
--- a/python/pyspark/pandas/data_type_ops/base.py
+++ b/python/pyspark/pandas/data_type_ops/base.py
@@ -49,6 +49,7 @@ from pyspark.pandas.typedef.typehints import (
extension_dtypes_available,
extension_float_dtypes_available,
extension_object_dtypes_available,
+ spark_type_to_pandas_dtype,
)
if extension_dtypes_available:
@@ -79,7 +80,7 @@ def is_valid_operand_for_numeric_arithmetic(operand: Any, *, allow_bool: bool =
def transform_boolean_operand_to_numeric(
- operand: Any, spark_type: Optional[DataType] = None
+ operand: Any, *, spark_type: Optional[DataType] = None
) -> Any:
"""Transform boolean operand to numeric.
@@ -92,7 +93,14 @@ def transform_boolean_operand_to_numeric(
if isinstance(operand, IndexOpsMixin) and isinstance(operand.spark.data_type, BooleanType):
assert spark_type, "spark_type must be provided if the operand is a boolean IndexOpsMixin"
- return operand.spark.transform(lambda scol: scol.cast(spark_type))
+ assert isinstance(spark_type, NumericType), "spark_type must be NumericType"
+ dtype = spark_type_to_pandas_dtype(
+ spark_type, use_extension_dtypes=operand._internal.data_fields[0].is_extension_dtype
+ )
+ return operand._with_new_scol(
+ operand.spark.column.cast(spark_type),
+ field=operand._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
+ )
elif isinstance(operand, bool):
return int(operand)
else:
@@ -122,7 +130,7 @@ def _as_categorical_type(
scol = F.coalesce(map_scol.getItem(index_ops.spark.column), SF.lit(-1))
return index_ops._with_new_scol(
- scol.cast(spark_type).alias(index_ops._internal.data_fields[0].name),
+ scol.cast(spark_type),
field=index_ops._internal.data_fields[0].copy(
dtype=dtype, spark_type=spark_type, nullable=False
),
@@ -131,17 +139,15 @@ def _as_categorical_type(
def _as_bool_type(index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
"""Cast `index_ops` to BooleanType Spark type, given `dtype`."""
- from pyspark.pandas.internal import InternalField
-
+ spark_type = BooleanType()
if isinstance(dtype, extension_dtypes):
- scol = index_ops.spark.column.cast(BooleanType())
+ scol = index_ops.spark.column.cast(spark_type)
else:
scol = F.when(index_ops.spark.column.isNull(), SF.lit(False)).otherwise(
- index_ops.spark.column.cast(BooleanType())
+ index_ops.spark.column.cast(spark_type)
)
return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type)
)
@@ -151,16 +157,14 @@ def _as_string_type(
"""Cast `index_ops` to StringType Spark type, given `dtype` and `null_str`,
representing null Spark column.
"""
- from pyspark.pandas.internal import InternalField
-
+ spark_type = StringType()
if isinstance(dtype, extension_dtypes):
- scol = index_ops.spark.column.cast(StringType())
+ scol = index_ops.spark.column.cast(spark_type)
else:
- casted = index_ops.spark.column.cast(StringType())
+ casted = index_ops.spark.column.cast(spark_type)
scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type)
)
@@ -181,10 +185,7 @@ def _as_other_type(
assert not need_pre_process, "Pre-processing is needed before the type casting."
scol = index_ops.spark.column.cast(spark_type)
- return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
- )
+ return index_ops._with_new_scol(scol, field=InternalField(dtype=dtype))
class DataTypeOps(object, metaclass=ABCMeta):
diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py b/python/pyspark/pandas/data_type_ops/boolean_ops.py
index a9f9239..9ec295e 100644
--- a/python/pyspark/pandas/data_type_ops/boolean_ops.py
+++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py
@@ -31,10 +31,8 @@ from pyspark.pandas.data_type_ops.base import (
_as_categorical_type,
_as_other_type,
)
-from pyspark.pandas.internal import InternalField
from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
-from pyspark.pandas.typedef.typehints import as_spark_type
+from pyspark.pandas.typedef.typehints import as_spark_type, extension_dtypes, pandas_on_spark_type
from pyspark.sql import functions as F
from pyspark.sql.column import Column
from pyspark.sql.types import BooleanType, StringType
@@ -58,14 +56,14 @@ class BooleanOps(DataTypeOps):
if isinstance(right, bool):
return left.__or__(right)
elif isinstance(right, numbers.Number):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left + right
else:
assert isinstance(right, IndexOpsMixin)
if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType):
return left.__or__(right)
else:
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left + right
def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -73,12 +71,12 @@ class BooleanOps(DataTypeOps):
raise TypeError(
"Subtraction can not be applied to %s and the given type." % self.pretty_name
)
- if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ if isinstance(right, numbers.Number):
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left - right
else:
assert isinstance(right, IndexOpsMixin)
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left - right
def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -89,14 +87,14 @@ class BooleanOps(DataTypeOps):
if isinstance(right, bool):
return left.__and__(right)
elif isinstance(right, numbers.Number):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left * right
else:
assert isinstance(right, IndexOpsMixin)
if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType):
return left.__and__(right)
else:
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left * right
def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -104,12 +102,12 @@ class BooleanOps(DataTypeOps):
raise TypeError(
"True division can not be applied to %s and the given type." % self.pretty_name
)
- if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ if isinstance(right, numbers.Number):
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left / right
else:
assert isinstance(right, IndexOpsMixin)
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left / right
def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -117,12 +115,12 @@ class BooleanOps(DataTypeOps):
raise TypeError(
"Floor division can not be applied to %s and the given type." % self.pretty_name
)
- if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ if isinstance(right, numbers.Number):
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left // right
else:
assert isinstance(right, IndexOpsMixin)
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left // right
def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -130,12 +128,12 @@ class BooleanOps(DataTypeOps):
raise TypeError(
"Modulo can not be applied to %s and the given type." % self.pretty_name
)
- if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ if isinstance(right, numbers.Number):
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left % right
else:
assert isinstance(right, IndexOpsMixin)
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left % right
def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -143,19 +141,19 @@ class BooleanOps(DataTypeOps):
raise TypeError(
"Exponentiation can not be applied to %s and the given type." % self.pretty_name
)
- if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ if isinstance(right, numbers.Number):
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return left ** right
else:
assert isinstance(right, IndexOpsMixin)
- left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+ left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
return left ** right
def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, bool):
return left.__or__(right)
elif isinstance(right, numbers.Number):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right + left
else:
raise TypeError(
@@ -164,7 +162,7 @@ class BooleanOps(DataTypeOps):
def rsub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right - left
else:
raise TypeError(
@@ -175,7 +173,7 @@ class BooleanOps(DataTypeOps):
if isinstance(right, bool):
return left.__and__(right)
elif isinstance(right, numbers.Number):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right * left
else:
raise TypeError(
@@ -184,7 +182,7 @@ class BooleanOps(DataTypeOps):
def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right / left
else:
raise TypeError(
@@ -193,7 +191,7 @@ class BooleanOps(DataTypeOps):
def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right // left
else:
raise TypeError(
@@ -202,7 +200,7 @@ class BooleanOps(DataTypeOps):
def rpow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right ** left
else:
raise TypeError(
@@ -211,7 +209,7 @@ class BooleanOps(DataTypeOps):
def rmod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, numbers.Number) and not isinstance(right, bool):
- left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+ left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
return right % left
else:
raise TypeError(
@@ -261,13 +259,17 @@ class BooleanOps(DataTypeOps):
index_ops.spark.column.isNotNull(),
F.when(index_ops.spark.column, "True").otherwise("False"),
)
+ nullable = index_ops.spark.nullable
else:
null_str = str(None)
casted = F.when(index_ops.spark.column, "True").otherwise("False")
scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
+ nullable = False
return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ scol,
+ field=index_ops._internal.data_fields[0].copy(
+ dtype=dtype, spark_type=spark_type, nullable=nullable
+ ),
)
else:
return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/data_type_ops/categorical_ops.py b/python/pyspark/pandas/data_type_ops/categorical_ops.py
index 8c2a27b..9238e6b 100644
--- a/python/pyspark/pandas/data_type_ops/categorical_ops.py
+++ b/python/pyspark/pandas/data_type_ops/categorical_ops.py
@@ -48,7 +48,7 @@ class CategoricalOps(DataTypeOps):
return col.cat.codes
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
- dtype, spark_type = pandas_on_spark_type(dtype)
+ dtype, _ = pandas_on_spark_type(dtype)
if isinstance(dtype, CategoricalDtype) and dtype.categories is None:
return index_ops.copy()
@@ -62,9 +62,7 @@ class CategoricalOps(DataTypeOps):
)
map_scol = F.create_map(*kvs)
scol = map_scol.getItem(index_ops.spark.column)
- return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0])
- ).astype(dtype)
+ return index_ops._with_new_scol(scol).astype(dtype)
# TODO(SPARK-35997): Implement comparison operators below
def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
diff --git a/python/pyspark/pandas/data_type_ops/date_ops.py b/python/pyspark/pandas/data_type_ops/date_ops.py
index 86fe8c3..59c8166 100644
--- a/python/pyspark/pandas/data_type_ops/date_ops.py
+++ b/python/pyspark/pandas/data_type_ops/date_ops.py
@@ -19,6 +19,7 @@ import datetime
import warnings
from typing import Any, Union
+import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
@@ -29,7 +30,6 @@ from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
from pyspark.pandas.base import column_op, IndexOpsMixin
from pyspark.pandas.data_type_ops.base import (
DataTypeOps,
- _as_bool_type,
_as_categorical_type,
_as_other_type,
_as_string_type,
@@ -104,7 +104,12 @@ class DateOps(DataTypeOps):
if isinstance(dtype, CategoricalDtype):
return _as_categorical_type(index_ops, dtype, spark_type)
elif isinstance(spark_type, BooleanType):
- return _as_bool_type(index_ops, dtype)
+ return index_ops._with_new_scol(
+ index_ops.spark.column.isNotNull(),
+ field=index_ops._internal.data_fields[0].copy(
+ dtype=np.dtype(bool), spark_type=spark_type, nullable=False
+ ),
+ )
elif isinstance(spark_type, StringType):
return _as_string_type(index_ops, dtype, null_str=str(pd.NaT))
else:
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index a30dc96..f815742 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -19,11 +19,12 @@ import datetime
import warnings
from typing import Any, Union, cast
+import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
from pyspark.sql import functions as F, Column
-from pyspark.sql.types import BooleanType, StringType, TimestampType
+from pyspark.sql.types import BooleanType, LongType, StringType, TimestampType
from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
from pyspark.pandas.base import IndexOpsMixin
@@ -33,9 +34,8 @@ from pyspark.pandas.data_type_ops.base import (
_as_categorical_type,
_as_other_type,
)
-from pyspark.pandas.internal import InternalField
from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import as_spark_type, extension_dtypes, pandas_on_spark_type
+from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
class DatetimeOps(DataTypeOps):
@@ -62,8 +62,11 @@ class DatetimeOps(DataTypeOps):
warnings.warn(msg, UserWarning)
return cast(
SeriesOrIndex,
- left.spark.transform(
- lambda scol: scol.astype("long") - SF.lit(right).cast(as_spark_type("long"))
+ left._with_new_scol(
+ left.spark.column.cast(LongType()) - SF.lit(right).cast(LongType()),
+ field=left._internal.data_fields[0].copy(
+ dtype=np.dtype("int64"), spark_type=LongType()
+ ),
),
)
else:
@@ -81,8 +84,11 @@ class DatetimeOps(DataTypeOps):
warnings.warn(msg, UserWarning)
return cast(
SeriesOrIndex,
- left.spark.transform(
- lambda scol: SF.lit(right).cast(as_spark_type("long")) - scol.astype("long")
+ left._with_new_scol(
+ SF.lit(right).cast(LongType()) - left.spark.column.cast(LongType()),
+ field=left._internal.data_fields[0].copy(
+ dtype=np.dtype("int64"), spark_type=LongType()
+ ),
),
)
else:
@@ -131,7 +137,7 @@ class DatetimeOps(DataTypeOps):
scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
return index_ops._with_new_scol(
scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
)
else:
return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 8b26843..ed089e5 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -33,9 +33,8 @@ from pyspark.pandas.data_type_ops.base import (
_as_other_type,
_as_string_type,
)
-from pyspark.pandas.internal import InternalField
from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
+from pyspark.pandas.typedef.typehints import extension_dtypes, pandas_on_spark_type
from pyspark.sql import functions as F
from pyspark.sql.column import Column
from pyspark.sql.types import (
@@ -55,38 +54,34 @@ class NumericOps(DataTypeOps):
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Addition can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(Column.__add__)(left, right)
def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Subtraction can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(Column.__sub__)(left, right)
def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Modulo can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def mod(left: Column, right: Any) -> Column:
return ((left % right) + right) % right
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(mod)(left, right)
def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Exponentiation can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def pow_func(left: Column, right: Any) -> Column:
return F.when(left == 1, left).otherwise(Column.__pow__(left, right))
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(pow_func)(left, right)
def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -179,29 +174,25 @@ class IntegralOps(NumericOps):
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Multiplication can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(Column.__mul__)(left, right)
def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("True division can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def truediv(left: Column, right: Any) -> Column:
return F.when(
SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right)
).otherwise(SF.lit(np.inf).__div__(left))
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(truediv)(left, right)
def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Floor division can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def floordiv(left: Column, right: Any) -> Column:
return F.when(SF.lit(right is np.nan), np.nan).otherwise(
F.when(
@@ -209,6 +200,7 @@ class IntegralOps(NumericOps):
).otherwise(SF.lit(np.inf).__div__(left))
)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(floordiv)(left, right)
def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -220,7 +212,7 @@ class IntegralOps(NumericOps):
SF.lit(right).__truediv__(left)
)
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(rtruediv)(left, right)
def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -232,7 +224,7 @@ class IntegralOps(NumericOps):
F.floor(SF.lit(right).__div__(left))
)
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(rfloordiv)(left, right)
def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
@@ -262,16 +254,13 @@ class FractionalOps(NumericOps):
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Multiplication can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return column_op(Column.__mul__)(left, right)
def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("True division can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def truediv(left: Column, right: Any) -> Column:
return F.when(
SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right)
@@ -281,14 +270,13 @@ class FractionalOps(NumericOps):
)
)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(truediv)(left, right)
def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if not is_valid_operand_for_numeric_arithmetic(right):
raise TypeError("Floor division can not be applied to given types.")
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
def floordiv(left: Column, right: Any) -> Column:
return F.when(SF.lit(right is np.nan), np.nan).otherwise(
F.when(
@@ -300,6 +288,7 @@ class FractionalOps(NumericOps):
)
)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(floordiv)(left, right)
def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -311,7 +300,7 @@ class FractionalOps(NumericOps):
SF.lit(right).__truediv__(left)
)
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(rtruediv)(left, right)
def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -325,7 +314,7 @@ class FractionalOps(NumericOps):
)
)
- right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+ right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
return numpy_column_op(rfloordiv)(left, right)
def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike:
@@ -351,7 +340,7 @@ class FractionalOps(NumericOps):
).otherwise(index_ops.spark.column.cast(spark_type))
return index_ops._with_new_scol(
scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
)
elif isinstance(spark_type, StringType):
return _as_string_type(index_ops, dtype, null_str=str(np.nan))
diff --git a/python/pyspark/pandas/data_type_ops/string_ops.py b/python/pyspark/pandas/data_type_ops/string_ops.py
index 9c6ca4c..b2c4259 100644
--- a/python/pyspark/pandas/data_type_ops/string_ops.py
+++ b/python/pyspark/pandas/data_type_ops/string_ops.py
@@ -31,7 +31,6 @@ from pyspark.pandas.data_type_ops.base import (
_as_other_type,
_as_string_type,
)
-from pyspark.pandas.internal import InternalField
from pyspark.pandas.spark import functions as SF
from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
from pyspark.sql import Column
@@ -48,19 +47,31 @@ class StringOps(DataTypeOps):
return "strings"
def add(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
- if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType):
+ if isinstance(right, str):
+ return cast(
+ SeriesOrIndex,
+ left._with_new_scol(
+ F.concat(left.spark.column, SF.lit(right)), field=left._internal.data_fields[0]
+ ),
+ )
+ elif isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType):
return column_op(F.concat)(left, right)
- elif isinstance(right, str):
- return column_op(F.concat)(left, SF.lit(right))
else:
raise TypeError("Addition can not be applied to given types.")
def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
- if (
+ if isinstance(right, int):
+ return cast(
+ SeriesOrIndex,
+ left._with_new_scol(
+ SF.repeat(left.spark.column, right), field=left._internal.data_fields[0]
+ ),
+ )
+ elif (
isinstance(right, IndexOpsMixin)
and isinstance(right.spark.data_type, IntegralType)
and not isinstance(right.dtype, CategoricalDtype)
- ) or isinstance(right, int):
+ ):
return column_op(SF.repeat)(left, right)
else:
raise TypeError("Multiplication can not be applied to given types.")
@@ -69,14 +80,21 @@ class StringOps(DataTypeOps):
if isinstance(right, str):
return cast(
SeriesOrIndex,
- left._with_new_scol(F.concat(SF.lit(right), left.spark.column)), # TODO: dtype?
+ left._with_new_scol(
+ F.concat(SF.lit(right), left.spark.column), field=left._internal.data_fields[0]
+ ),
)
else:
raise TypeError("Addition can not be applied to given types.")
def rmul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
if isinstance(right, int):
- return column_op(SF.repeat)(left, right)
+ return cast(
+ SeriesOrIndex,
+ left._with_new_scol(
+ SF.repeat(left.spark.column, right), field=left._internal.data_fields[0]
+ ),
+ )
else:
raise TypeError("Multiplication can not be applied to given types.")
@@ -114,8 +132,8 @@ class StringOps(DataTypeOps):
F.length(index_ops.spark.column) > 0
)
return index_ops._with_new_scol(
- scol.alias(index_ops._internal.data_spark_column_names[0]),
- field=InternalField(dtype=dtype),
+ scol,
+ field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
)
elif isinstance(spark_type, StringType):
return _as_string_type(index_ops, dtype)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
index fef9fb1..fa37df0 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
@@ -26,7 +26,10 @@ from pandas.api.types import CategoricalDtype
from pyspark import pandas as ps
from pyspark.pandas.config import option_context
from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
-from pyspark.pandas.typedef.typehints import extension_object_dtypes_available
+from pyspark.pandas.typedef.typehints import (
+ extension_float_dtypes_available,
+ extension_object_dtypes_available,
+)
from pyspark.sql.types import BooleanType
from pyspark.testing.pandasutils import PandasOnSparkTestCase
@@ -399,8 +402,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_add(self):
pser = self.pser
psser = self.psser
- self.assert_eq((pser + 1).astype(float), psser + 1)
- self.assert_eq((pser + 0.1).astype(float), psser + 0.1)
+ self.check_extension(pser + 1, psser + 1)
+ if extension_float_dtypes_available:
+ self.check_extension(pser + 0.1, psser + 0.1)
+ else:
+ self.assert_eq(pser + 0.1, psser + 0.1)
# In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark
self.check_extension(ps.Series([True, True, True], dtype="boolean"), psser + True)
@@ -420,8 +426,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_sub(self):
pser = self.pser
psser = self.psser
- self.assert_eq((pser - 1).astype(float), psser - 1)
- self.assert_eq((pser - 0.1).astype(float), psser - 0.1)
+ self.check_extension(pser - 1, psser - 1)
+ if extension_float_dtypes_available:
+ self.check_extension(pser - 0.1, psser - 0.1)
+ else:
+ self.assert_eq(pser - 0.1, psser - 0.1)
self.assertRaises(TypeError, lambda: psser - psser)
self.assertRaises(TypeError, lambda: psser - True)
@@ -434,8 +443,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_mul(self):
pser = self.pser
psser = self.psser
- self.assert_eq((pser * 1).astype(float), psser * 1)
- self.assert_eq((pser * 0.1).astype(float), psser * 0.1)
+ self.check_extension(pser * 1, psser * 1)
+ if extension_float_dtypes_available:
+ self.check_extension(pser * 0.1, psser * 0.1)
+ else:
+ self.assert_eq(pser * 0.1, psser * 0.1)
# In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark
self.check_extension(pser * True, psser * True)
@@ -455,8 +467,12 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_truediv(self):
pser = self.pser
psser = self.psser
- self.assert_eq((pser / 1).astype(float), psser / 1)
- self.assert_eq((pser / 0.1).astype(float), psser / 0.1)
+ if extension_float_dtypes_available:
+ self.check_extension(pser / 1, psser / 1)
+ self.check_extension(pser / 0.1, psser / 0.1)
+ else:
+ self.assert_eq(pser / 1, psser / 1)
+ self.assert_eq(pser / 0.1, psser / 0.1)
self.assertRaises(TypeError, lambda: psser / psser)
self.assertRaises(TypeError, lambda: psser / True)
@@ -474,7 +490,10 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
psser = self.psser
# float is always returned in pandas-on-Spark
- self.assert_eq((pser // 1).astype("float"), psser // 1)
+ if extension_float_dtypes_available:
+ self.check_extension((pser // 1).astype("Float64"), psser // 1)
+ else:
+ self.assert_eq((pser // 1).astype("float"), psser // 1)
# in pandas, 1 // 0.1 = 9.0; in pandas-on-Spark, 1 // 0.1 = 10.0
# self.assert_eq(pser // 0.1, psser // 0.1)
@@ -494,8 +513,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
def test_mod(self):
pser = self.pser
psser = self.psser
- self.assert_eq((pser % 1).astype(float), psser % 1)
- self.assert_eq((pser % 0.1).astype(float), psser % 0.1)
+ self.check_extension(pser % 1, psser % 1)
+ if extension_float_dtypes_available:
+ self.check_extension(pser % 0.1, psser % 0.1)
+ else:
+ self.assert_eq(pser % 0.1, psser % 0.1)
self.assertRaises(TypeError, lambda: psser % psser)
self.assertRaises(TypeError, lambda: psser % True)
@@ -509,9 +531,18 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
pser = self.pser
psser = self.psser
# float is always returned in pandas-on-Spark
- self.assert_eq((pser ** 1).astype("float"), psser ** 1)
- self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1)
- self.assert_eq((pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float))
+ if extension_float_dtypes_available:
+ self.check_extension((pser ** 1).astype("Float64"), psser ** 1)
+ self.check_extension((pser ** 0.1).astype("Float64"), self.psser ** 0.1)
+ self.check_extension(
+ (pser ** pser.astype(float)).astype("Float64"), psser ** psser.astype(float)
+ )
+ else:
+ self.assert_eq((pser ** 1).astype("float"), psser ** 1)
+ self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1)
+ self.assert_eq(
+ (pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float)
+ )
self.assertRaises(TypeError, lambda: psser ** psser)
self.assertRaises(TypeError, lambda: psser ** True)
@@ -526,8 +557,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.assertRaises(TypeError, lambda: self.psser ** psser)
def test_radd(self):
- self.assert_eq((1 + self.pser).astype(float), 1 + self.psser)
- self.assert_eq((0.1 + self.pser).astype(float), 0.1 + self.psser)
+ self.check_extension(1 + self.pser, 1 + self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(0.1 + self.pser, 0.1 + self.psser)
+ else:
+ self.assert_eq(0.1 + self.pser, 0.1 + self.psser)
self.assertRaises(TypeError, lambda: "x" + self.psser)
# In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark
@@ -538,16 +572,22 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + self.psser)
def test_rsub(self):
- self.assert_eq((1 - self.pser).astype(float), 1 - self.psser)
- self.assert_eq((0.1 - self.pser).astype(float), 0.1 - self.psser)
+ self.check_extension(1 - self.pser, 1 - self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(0.1 - self.pser, 0.1 - self.psser)
+ else:
+ self.assert_eq(0.1 - self.pser, 0.1 - self.psser)
self.assertRaises(TypeError, lambda: "x" - self.psser)
self.assertRaises(TypeError, lambda: True - self.psser)
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - self.psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - self.psser)
def test_rmul(self):
- self.assert_eq((1 * self.pser).astype(float), 1 * self.psser)
- self.assert_eq((0.1 * self.pser).astype(float), 0.1 * self.psser)
+ self.check_extension(1 * self.pser, 1 * self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(0.1 * self.pser, 0.1 * self.psser)
+ else:
+ self.assert_eq(0.1 * self.pser, 0.1 * self.psser)
self.assertRaises(TypeError, lambda: "x" * self.psser)
# In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark
@@ -558,35 +598,49 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) * self.psser)
def test_rtruediv(self):
- self.assert_eq((1 / self.pser).astype(float), 1 / self.psser)
- self.assert_eq((0.1 / self.pser).astype(float), 0.1 / self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(1 / self.pser, 1 / self.psser)
+ self.check_extension(0.1 / self.pser, 0.1 / self.psser)
+ else:
+ self.assert_eq(1 / self.pser, 1 / self.psser)
+ self.assert_eq(0.1 / self.pser, 0.1 / self.psser)
self.assertRaises(TypeError, lambda: "x" / self.psser)
self.assertRaises(TypeError, lambda: True / self.psser)
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / self.psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) / self.psser)
def test_rfloordiv(self):
- self.assert_eq((1 // self.psser).astype(float), ps.Series([1.0, np.inf, np.nan]))
- self.assert_eq((0.1 // self.psser).astype(float), ps.Series([0.0, np.inf, np.nan]))
+ self.assert_eq(pd.Series([1.0, np.inf, np.nan]), (1 // self.psser).astype(float))
+ self.assert_eq(pd.Series([0.0, np.inf, np.nan]), (0.1 // self.psser).astype(float))
self.assertRaises(TypeError, lambda: "x" // self.psser)
self.assertRaises(TypeError, lambda: True // self.psser)
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // self.psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) // self.psser)
def test_rpow(self):
- self.assert_eq(1 ** self.psser, ps.Series([1, 1, 1], dtype=float))
- self.assert_eq((0.1 ** self.pser).astype(float), 0.1 ** self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(pd.Series([1, 1, 1], dtype="Float64"), 1 ** self.psser)
+ self.check_extension((0.1 ** self.pser).astype("Float64"), 0.1 ** self.psser)
+ else:
+ self.assert_eq(pd.Series([1, 1, 1], dtype="float"), 1 ** self.psser)
+ self.assert_eq((0.1 ** self.pser).astype("float"), 0.1 ** self.psser)
self.assertRaises(TypeError, lambda: "x" ** self.psser)
self.assertRaises(TypeError, lambda: True ** self.psser)
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** self.psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** self.psser)
def test_rmod(self):
- self.assert_eq(ps.Series([0, np.nan, np.nan], dtype=float), 1 % self.psser)
- self.assert_eq(
- ps.Series([0.10000000000000009, np.nan, np.nan], dtype=float),
- 0.1 % self.psser,
- )
+ self.check_extension(ps.Series([0, np.nan, np.nan], dtype="Int64"), 1 % self.psser)
+ if extension_float_dtypes_available:
+ self.check_extension(
+ pd.Series([0.10000000000000009, np.nan, np.nan], dtype="Float64"),
+ 0.1 % self.psser,
+ )
+ else:
+ self.assert_eq(
+ pd.Series([0.10000000000000009, np.nan, np.nan], dtype="float"),
+ 0.1 % self.psser,
+ )
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % self.psser)
self.assertRaises(TypeError, lambda: True % self.psser)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
index edfe806..1574ebf 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
@@ -190,7 +190,7 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
pser = self.pser
psser = self.psser
self.assert_eq(pser.astype(str), psser.astype(str))
- self.assert_eq(pd.Series([None, None, None]), psser.astype(bool))
+ self.assert_eq(pser.astype(bool), psser.astype(bool))
cat_type = CategoricalDtype(categories=["a", "b", "c"])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
index fc843c4..4bda305 100644
--- a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
+++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
@@ -164,7 +164,7 @@ class TestCasesUtils(object):
+ self.integral_extension_dtypes
)
- def check_extension(self, psser, pser):
+ def check_extension(self, left, right):
"""
Compare `psser` and `pser` of numeric ExtensionDtypes.
@@ -172,7 +172,8 @@ class TestCasesUtils(object):
pandas versions. Please refer to https://github.com/pandas-dev/pandas/issues/39410.
"""
if LooseVersion("1.1") <= LooseVersion(pd.__version__) < LooseVersion("1.2.2"):
- self.assert_eq(psser, pser, check_exact=False)
- self.assertTrue(isinstance(psser.dtype, extension_dtypes))
+ self.assert_eq(left, right, check_exact=False)
+ self.assertTrue(isinstance(left.dtype, extension_dtypes))
+ self.assertTrue(isinstance(right.dtype, extension_dtypes))
else:
- self.assert_eq(psser, pser)
+ self.assert_eq(left, right)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org