You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/12 02:55:40 UTC

[spark] branch master updated: [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e6c6e  [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps
95e6c6e is described below

commit 95e6c6e3e950b95b2b303d2fd5d859f752e2ca5e
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Mon Jul 12 11:55:05 2021 +0900

    [SPARK-36064][PYTHON] Manage InternalField more in DataTypeOps
    
    ### What changes were proposed in this pull request?
    
    Properly set `InternalField` more in `DataTypeOps`.
    
    ### Why are the changes needed?
    
    There are more places in `DataTypeOps` where we can manage `InternalField`.
    We should manage `InternalField` for these cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #33275 from ueshin/issues/SPARK-36064/fields.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/data_type_ops/base.py        |  39 +++----
 python/pyspark/pandas/data_type_ops/boolean_ops.py |  64 +++++------
 .../pandas/data_type_ops/categorical_ops.py        |   6 +-
 python/pyspark/pandas/data_type_ops/date_ops.py    |   9 +-
 .../pyspark/pandas/data_type_ops/datetime_ops.py   |  22 ++--
 python/pyspark/pandas/data_type_ops/num_ops.py     |  43 +++-----
 python/pyspark/pandas/data_type_ops/string_ops.py  |  38 +++++--
 .../pandas/tests/data_type_ops/test_boolean_ops.py | 118 +++++++++++++++------
 .../pandas/tests/data_type_ops/test_date_ops.py    |   2 +-
 .../pandas/tests/data_type_ops/testing_utils.py    |   9 +-
 10 files changed, 212 insertions(+), 138 deletions(-)

diff --git a/python/pyspark/pandas/data_type_ops/base.py b/python/pyspark/pandas/data_type_ops/base.py
index 71aa943..c79408b 100644
--- a/python/pyspark/pandas/data_type_ops/base.py
+++ b/python/pyspark/pandas/data_type_ops/base.py
@@ -49,6 +49,7 @@ from pyspark.pandas.typedef.typehints import (
     extension_dtypes_available,
     extension_float_dtypes_available,
     extension_object_dtypes_available,
+    spark_type_to_pandas_dtype,
 )
 
 if extension_dtypes_available:
@@ -79,7 +80,7 @@ def is_valid_operand_for_numeric_arithmetic(operand: Any, *, allow_bool: bool =
 
 
 def transform_boolean_operand_to_numeric(
-    operand: Any, spark_type: Optional[DataType] = None
+    operand: Any, *, spark_type: Optional[DataType] = None
 ) -> Any:
     """Transform boolean operand to numeric.
 
@@ -92,7 +93,14 @@ def transform_boolean_operand_to_numeric(
 
     if isinstance(operand, IndexOpsMixin) and isinstance(operand.spark.data_type, BooleanType):
         assert spark_type, "spark_type must be provided if the operand is a boolean IndexOpsMixin"
-        return operand.spark.transform(lambda scol: scol.cast(spark_type))
+        assert isinstance(spark_type, NumericType), "spark_type must be NumericType"
+        dtype = spark_type_to_pandas_dtype(
+            spark_type, use_extension_dtypes=operand._internal.data_fields[0].is_extension_dtype
+        )
+        return operand._with_new_scol(
+            operand.spark.column.cast(spark_type),
+            field=operand._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
+        )
     elif isinstance(operand, bool):
         return int(operand)
     else:
@@ -122,7 +130,7 @@ def _as_categorical_type(
 
             scol = F.coalesce(map_scol.getItem(index_ops.spark.column), SF.lit(-1))
         return index_ops._with_new_scol(
-            scol.cast(spark_type).alias(index_ops._internal.data_fields[0].name),
+            scol.cast(spark_type),
             field=index_ops._internal.data_fields[0].copy(
                 dtype=dtype, spark_type=spark_type, nullable=False
             ),
@@ -131,17 +139,15 @@ def _as_categorical_type(
 
 def _as_bool_type(index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
     """Cast `index_ops` to BooleanType Spark type, given `dtype`."""
-    from pyspark.pandas.internal import InternalField
-
+    spark_type = BooleanType()
     if isinstance(dtype, extension_dtypes):
-        scol = index_ops.spark.column.cast(BooleanType())
+        scol = index_ops.spark.column.cast(spark_type)
     else:
         scol = F.when(index_ops.spark.column.isNull(), SF.lit(False)).otherwise(
-            index_ops.spark.column.cast(BooleanType())
+            index_ops.spark.column.cast(spark_type)
         )
     return index_ops._with_new_scol(
-        scol.alias(index_ops._internal.data_spark_column_names[0]),
-        field=InternalField(dtype=dtype),
+        scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type)
     )
 
 
@@ -151,16 +157,14 @@ def _as_string_type(
     """Cast `index_ops` to StringType Spark type, given `dtype` and `null_str`,
     representing null Spark column.
     """
-    from pyspark.pandas.internal import InternalField
-
+    spark_type = StringType()
     if isinstance(dtype, extension_dtypes):
-        scol = index_ops.spark.column.cast(StringType())
+        scol = index_ops.spark.column.cast(spark_type)
     else:
-        casted = index_ops.spark.column.cast(StringType())
+        casted = index_ops.spark.column.cast(spark_type)
         scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
     return index_ops._with_new_scol(
-        scol.alias(index_ops._internal.data_spark_column_names[0]),
-        field=InternalField(dtype=dtype),
+        scol, field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type)
     )
 
 
@@ -181,10 +185,7 @@ def _as_other_type(
     assert not need_pre_process, "Pre-processing is needed before the type casting."
 
     scol = index_ops.spark.column.cast(spark_type)
-    return index_ops._with_new_scol(
-        scol.alias(index_ops._internal.data_spark_column_names[0]),
-        field=InternalField(dtype=dtype),
-    )
+    return index_ops._with_new_scol(scol, field=InternalField(dtype=dtype))
 
 
 class DataTypeOps(object, metaclass=ABCMeta):
diff --git a/python/pyspark/pandas/data_type_ops/boolean_ops.py b/python/pyspark/pandas/data_type_ops/boolean_ops.py
index a9f9239..9ec295e 100644
--- a/python/pyspark/pandas/data_type_ops/boolean_ops.py
+++ b/python/pyspark/pandas/data_type_ops/boolean_ops.py
@@ -31,10 +31,8 @@ from pyspark.pandas.data_type_ops.base import (
     _as_categorical_type,
     _as_other_type,
 )
-from pyspark.pandas.internal import InternalField
 from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
-from pyspark.pandas.typedef.typehints import as_spark_type
+from pyspark.pandas.typedef.typehints import as_spark_type, extension_dtypes, pandas_on_spark_type
 from pyspark.sql import functions as F
 from pyspark.sql.column import Column
 from pyspark.sql.types import BooleanType, StringType
@@ -58,14 +56,14 @@ class BooleanOps(DataTypeOps):
         if isinstance(right, bool):
             return left.__or__(right)
         elif isinstance(right, numbers.Number):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left + right
         else:
             assert isinstance(right, IndexOpsMixin)
             if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType):
                 return left.__or__(right)
             else:
-                left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+                left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
                 return left + right
 
     def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -73,12 +71,12 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "Subtraction can not be applied to %s and the given type." % self.pretty_name
             )
-        if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+        if isinstance(right, numbers.Number):
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left - right
         else:
             assert isinstance(right, IndexOpsMixin)
-            left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+            left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
             return left - right
 
     def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -89,14 +87,14 @@ class BooleanOps(DataTypeOps):
         if isinstance(right, bool):
             return left.__and__(right)
         elif isinstance(right, numbers.Number):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left * right
         else:
             assert isinstance(right, IndexOpsMixin)
             if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, BooleanType):
                 return left.__and__(right)
             else:
-                left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+                left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
                 return left * right
 
     def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -104,12 +102,12 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "True division can not be applied to %s and the given type." % self.pretty_name
             )
-        if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+        if isinstance(right, numbers.Number):
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left / right
         else:
             assert isinstance(right, IndexOpsMixin)
-            left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+            left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
             return left / right
 
     def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -117,12 +115,12 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "Floor division can not be applied to %s and the given type." % self.pretty_name
             )
-        if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+        if isinstance(right, numbers.Number):
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left // right
         else:
             assert isinstance(right, IndexOpsMixin)
-            left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+            left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
             return left // right
 
     def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -130,12 +128,12 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "Modulo can not be applied to %s and the given type." % self.pretty_name
             )
-        if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+        if isinstance(right, numbers.Number):
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left % right
         else:
             assert isinstance(right, IndexOpsMixin)
-            left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+            left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
             return left % right
 
     def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -143,19 +141,19 @@ class BooleanOps(DataTypeOps):
             raise TypeError(
                 "Exponentiation can not be applied to %s and the given type." % self.pretty_name
             )
-        if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+        if isinstance(right, numbers.Number):
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return left ** right
         else:
             assert isinstance(right, IndexOpsMixin)
-            left = transform_boolean_operand_to_numeric(left, right.spark.data_type)
+            left = transform_boolean_operand_to_numeric(left, spark_type=right.spark.data_type)
             return left ** right
 
     def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, bool):
             return left.__or__(right)
         elif isinstance(right, numbers.Number):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right + left
         else:
             raise TypeError(
@@ -164,7 +162,7 @@ class BooleanOps(DataTypeOps):
 
     def rsub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right - left
         else:
             raise TypeError(
@@ -175,7 +173,7 @@ class BooleanOps(DataTypeOps):
         if isinstance(right, bool):
             return left.__and__(right)
         elif isinstance(right, numbers.Number):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right * left
         else:
             raise TypeError(
@@ -184,7 +182,7 @@ class BooleanOps(DataTypeOps):
 
     def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right / left
         else:
             raise TypeError(
@@ -193,7 +191,7 @@ class BooleanOps(DataTypeOps):
 
     def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right // left
         else:
             raise TypeError(
@@ -202,7 +200,7 @@ class BooleanOps(DataTypeOps):
 
     def rpow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right ** left
         else:
             raise TypeError(
@@ -211,7 +209,7 @@ class BooleanOps(DataTypeOps):
 
     def rmod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, numbers.Number) and not isinstance(right, bool):
-            left = left.spark.transform(lambda scol: scol.cast(as_spark_type(type(right))))
+            left = transform_boolean_operand_to_numeric(left, spark_type=as_spark_type(type(right)))
             return right % left
         else:
             raise TypeError(
@@ -261,13 +259,17 @@ class BooleanOps(DataTypeOps):
                     index_ops.spark.column.isNotNull(),
                     F.when(index_ops.spark.column, "True").otherwise("False"),
                 )
+                nullable = index_ops.spark.nullable
             else:
                 null_str = str(None)
                 casted = F.when(index_ops.spark.column, "True").otherwise("False")
                 scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
+                nullable = False
             return index_ops._with_new_scol(
-                scol.alias(index_ops._internal.data_spark_column_names[0]),
-                field=InternalField(dtype=dtype),
+                scol,
+                field=index_ops._internal.data_fields[0].copy(
+                    dtype=dtype, spark_type=spark_type, nullable=nullable
+                ),
             )
         else:
             return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/data_type_ops/categorical_ops.py b/python/pyspark/pandas/data_type_ops/categorical_ops.py
index 8c2a27b..9238e6b 100644
--- a/python/pyspark/pandas/data_type_ops/categorical_ops.py
+++ b/python/pyspark/pandas/data_type_ops/categorical_ops.py
@@ -48,7 +48,7 @@ class CategoricalOps(DataTypeOps):
         return col.cat.codes
 
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
-        dtype, spark_type = pandas_on_spark_type(dtype)
+        dtype, _ = pandas_on_spark_type(dtype)
 
         if isinstance(dtype, CategoricalDtype) and dtype.categories is None:
             return index_ops.copy()
@@ -62,9 +62,7 @@ class CategoricalOps(DataTypeOps):
             )
             map_scol = F.create_map(*kvs)
             scol = map_scol.getItem(index_ops.spark.column)
-        return index_ops._with_new_scol(
-            scol.alias(index_ops._internal.data_spark_column_names[0])
-        ).astype(dtype)
+        return index_ops._with_new_scol(scol).astype(dtype)
 
     # TODO(SPARK-35997): Implement comparison operators below
     def lt(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
diff --git a/python/pyspark/pandas/data_type_ops/date_ops.py b/python/pyspark/pandas/data_type_ops/date_ops.py
index 86fe8c3..59c8166 100644
--- a/python/pyspark/pandas/data_type_ops/date_ops.py
+++ b/python/pyspark/pandas/data_type_ops/date_ops.py
@@ -19,6 +19,7 @@ import datetime
 import warnings
 from typing import Any, Union
 
+import numpy as np
 import pandas as pd
 from pandas.api.types import CategoricalDtype
 
@@ -29,7 +30,6 @@ from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
 from pyspark.pandas.base import column_op, IndexOpsMixin
 from pyspark.pandas.data_type_ops.base import (
     DataTypeOps,
-    _as_bool_type,
     _as_categorical_type,
     _as_other_type,
     _as_string_type,
@@ -104,7 +104,12 @@ class DateOps(DataTypeOps):
         if isinstance(dtype, CategoricalDtype):
             return _as_categorical_type(index_ops, dtype, spark_type)
         elif isinstance(spark_type, BooleanType):
-            return _as_bool_type(index_ops, dtype)
+            return index_ops._with_new_scol(
+                index_ops.spark.column.isNotNull(),
+                field=index_ops._internal.data_fields[0].copy(
+                    dtype=np.dtype(bool), spark_type=spark_type, nullable=False
+                ),
+            )
         elif isinstance(spark_type, StringType):
             return _as_string_type(index_ops, dtype, null_str=str(pd.NaT))
         else:
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index a30dc96..f815742 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -19,11 +19,12 @@ import datetime
 import warnings
 from typing import Any, Union, cast
 
+import numpy as np
 import pandas as pd
 from pandas.api.types import CategoricalDtype
 
 from pyspark.sql import functions as F, Column
-from pyspark.sql.types import BooleanType, StringType, TimestampType
+from pyspark.sql.types import BooleanType, LongType, StringType, TimestampType
 
 from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
 from pyspark.pandas.base import IndexOpsMixin
@@ -33,9 +34,8 @@ from pyspark.pandas.data_type_ops.base import (
     _as_categorical_type,
     _as_other_type,
 )
-from pyspark.pandas.internal import InternalField
 from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import as_spark_type, extension_dtypes, pandas_on_spark_type
+from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
 
 
 class DatetimeOps(DataTypeOps):
@@ -62,8 +62,11 @@ class DatetimeOps(DataTypeOps):
             warnings.warn(msg, UserWarning)
             return cast(
                 SeriesOrIndex,
-                left.spark.transform(
-                    lambda scol: scol.astype("long") - SF.lit(right).cast(as_spark_type("long"))
+                left._with_new_scol(
+                    left.spark.column.cast(LongType()) - SF.lit(right).cast(LongType()),
+                    field=left._internal.data_fields[0].copy(
+                        dtype=np.dtype("int64"), spark_type=LongType()
+                    ),
                 ),
             )
         else:
@@ -81,8 +84,11 @@ class DatetimeOps(DataTypeOps):
             warnings.warn(msg, UserWarning)
             return cast(
                 SeriesOrIndex,
-                left.spark.transform(
-                    lambda scol: SF.lit(right).cast(as_spark_type("long")) - scol.astype("long")
+                left._with_new_scol(
+                    SF.lit(right).cast(LongType()) - left.spark.column.cast(LongType()),
+                    field=left._internal.data_fields[0].copy(
+                        dtype=np.dtype("int64"), spark_type=LongType()
+                    ),
                 ),
             )
         else:
@@ -131,7 +137,7 @@ class DatetimeOps(DataTypeOps):
                 scol = F.when(index_ops.spark.column.isNull(), null_str).otherwise(casted)
             return index_ops._with_new_scol(
                 scol.alias(index_ops._internal.data_spark_column_names[0]),
-                field=InternalField(dtype=dtype),
+                field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
             )
         else:
             return _as_other_type(index_ops, dtype, spark_type)
diff --git a/python/pyspark/pandas/data_type_ops/num_ops.py b/python/pyspark/pandas/data_type_ops/num_ops.py
index 8b26843..ed089e5 100644
--- a/python/pyspark/pandas/data_type_ops/num_ops.py
+++ b/python/pyspark/pandas/data_type_ops/num_ops.py
@@ -33,9 +33,8 @@ from pyspark.pandas.data_type_ops.base import (
     _as_other_type,
     _as_string_type,
 )
-from pyspark.pandas.internal import InternalField
 from pyspark.pandas.spark import functions as SF
-from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
+from pyspark.pandas.typedef.typehints import extension_dtypes, pandas_on_spark_type
 from pyspark.sql import functions as F
 from pyspark.sql.column import Column
 from pyspark.sql.types import (
@@ -55,38 +54,34 @@ class NumericOps(DataTypeOps):
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Addition can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(Column.__add__)(left, right)
 
     def sub(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Subtraction can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(Column.__sub__)(left, right)
 
     def mod(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Modulo can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def mod(left: Column, right: Any) -> Column:
             return ((left % right) + right) % right
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(mod)(left, right)
 
     def pow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Exponentiation can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def pow_func(left: Column, right: Any) -> Column:
             return F.when(left == 1, left).otherwise(Column.__pow__(left, right))
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(pow_func)(left, right)
 
     def radd(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -179,29 +174,25 @@ class IntegralOps(NumericOps):
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Multiplication can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(Column.__mul__)(left, right)
 
     def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("True division can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def truediv(left: Column, right: Any) -> Column:
             return F.when(
                 SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right)
             ).otherwise(SF.lit(np.inf).__div__(left))
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(truediv)(left, right)
 
     def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Floor division can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def floordiv(left: Column, right: Any) -> Column:
             return F.when(SF.lit(right is np.nan), np.nan).otherwise(
                 F.when(
@@ -209,6 +200,7 @@ class IntegralOps(NumericOps):
                 ).otherwise(SF.lit(np.inf).__div__(left))
             )
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(floordiv)(left, right)
 
     def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -220,7 +212,7 @@ class IntegralOps(NumericOps):
                 SF.lit(right).__truediv__(left)
             )
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(rtruediv)(left, right)
 
     def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -232,7 +224,7 @@ class IntegralOps(NumericOps):
                 F.floor(SF.lit(right).__div__(left))
             )
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(rfloordiv)(left, right)
 
     def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
@@ -262,16 +254,13 @@ class FractionalOps(NumericOps):
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Multiplication can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return column_op(Column.__mul__)(left, right)
 
     def truediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("True division can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def truediv(left: Column, right: Any) -> Column:
             return F.when(
                 SF.lit(right != 0) | SF.lit(right).isNull(), left.__div__(right)
@@ -281,14 +270,13 @@ class FractionalOps(NumericOps):
                 )
             )
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(truediv)(left, right)
 
     def floordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if not is_valid_operand_for_numeric_arithmetic(right):
             raise TypeError("Floor division can not be applied to given types.")
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
-
         def floordiv(left: Column, right: Any) -> Column:
             return F.when(SF.lit(right is np.nan), np.nan).otherwise(
                 F.when(
@@ -300,6 +288,7 @@ class FractionalOps(NumericOps):
                 )
             )
 
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(floordiv)(left, right)
 
     def rtruediv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -311,7 +300,7 @@ class FractionalOps(NumericOps):
                 SF.lit(right).__truediv__(left)
             )
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(rtruediv)(left, right)
 
     def rfloordiv(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
@@ -325,7 +314,7 @@ class FractionalOps(NumericOps):
                 )
             )
 
-        right = transform_boolean_operand_to_numeric(right, left.spark.data_type)
+        right = transform_boolean_operand_to_numeric(right, spark_type=left.spark.data_type)
         return numpy_column_op(rfloordiv)(left, right)
 
     def isnull(self, index_ops: IndexOpsLike) -> IndexOpsLike:
@@ -351,7 +340,7 @@ class FractionalOps(NumericOps):
                 ).otherwise(index_ops.spark.column.cast(spark_type))
             return index_ops._with_new_scol(
                 scol.alias(index_ops._internal.data_spark_column_names[0]),
-                field=InternalField(dtype=dtype),
+                field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
             )
         elif isinstance(spark_type, StringType):
             return _as_string_type(index_ops, dtype, null_str=str(np.nan))
diff --git a/python/pyspark/pandas/data_type_ops/string_ops.py b/python/pyspark/pandas/data_type_ops/string_ops.py
index 9c6ca4c..b2c4259 100644
--- a/python/pyspark/pandas/data_type_ops/string_ops.py
+++ b/python/pyspark/pandas/data_type_ops/string_ops.py
@@ -31,7 +31,6 @@ from pyspark.pandas.data_type_ops.base import (
     _as_other_type,
     _as_string_type,
 )
-from pyspark.pandas.internal import InternalField
 from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.typedef import extension_dtypes, pandas_on_spark_type
 from pyspark.sql import Column
@@ -48,19 +47,31 @@ class StringOps(DataTypeOps):
         return "strings"
 
     def add(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
-        if isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType):
+        if isinstance(right, str):
+            return cast(
+                SeriesOrIndex,
+                left._with_new_scol(
+                    F.concat(left.spark.column, SF.lit(right)), field=left._internal.data_fields[0]
+                ),
+            )
+        elif isinstance(right, IndexOpsMixin) and isinstance(right.spark.data_type, StringType):
             return column_op(F.concat)(left, right)
-        elif isinstance(right, str):
-            return column_op(F.concat)(left, SF.lit(right))
         else:
             raise TypeError("Addition can not be applied to given types.")
 
     def mul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
-        if (
+        if isinstance(right, int):
+            return cast(
+                SeriesOrIndex,
+                left._with_new_scol(
+                    SF.repeat(left.spark.column, right), field=left._internal.data_fields[0]
+                ),
+            )
+        elif (
             isinstance(right, IndexOpsMixin)
             and isinstance(right.spark.data_type, IntegralType)
             and not isinstance(right.dtype, CategoricalDtype)
-        ) or isinstance(right, int):
+        ):
             return column_op(SF.repeat)(left, right)
         else:
             raise TypeError("Multiplication can not be applied to given types.")
@@ -69,14 +80,21 @@ class StringOps(DataTypeOps):
         if isinstance(right, str):
             return cast(
                 SeriesOrIndex,
-                left._with_new_scol(F.concat(SF.lit(right), left.spark.column)),  # TODO: dtype?
+                left._with_new_scol(
+                    F.concat(SF.lit(right), left.spark.column), field=left._internal.data_fields[0]
+                ),
             )
         else:
             raise TypeError("Addition can not be applied to given types.")
 
     def rmul(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
         if isinstance(right, int):
-            return column_op(SF.repeat)(left, right)
+            return cast(
+                SeriesOrIndex,
+                left._with_new_scol(
+                    SF.repeat(left.spark.column, right), field=left._internal.data_fields[0]
+                ),
+            )
         else:
             raise TypeError("Multiplication can not be applied to given types.")
 
@@ -114,8 +132,8 @@ class StringOps(DataTypeOps):
                     F.length(index_ops.spark.column) > 0
                 )
             return index_ops._with_new_scol(
-                scol.alias(index_ops._internal.data_spark_column_names[0]),
-                field=InternalField(dtype=dtype),
+                scol,
+                field=index_ops._internal.data_fields[0].copy(dtype=dtype, spark_type=spark_type),
             )
         elif isinstance(spark_type, StringType):
             return _as_string_type(index_ops, dtype)
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
index fef9fb1..fa37df0 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
@@ -26,7 +26,10 @@ from pandas.api.types import CategoricalDtype
 from pyspark import pandas as ps
 from pyspark.pandas.config import option_context
 from pyspark.pandas.tests.data_type_ops.testing_utils import TestCasesUtils
-from pyspark.pandas.typedef.typehints import extension_object_dtypes_available
+from pyspark.pandas.typedef.typehints import (
+    extension_float_dtypes_available,
+    extension_object_dtypes_available,
+)
 from pyspark.sql.types import BooleanType
 from pyspark.testing.pandasutils import PandasOnSparkTestCase
 
@@ -399,8 +402,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
     def test_add(self):
         pser = self.pser
         psser = self.psser
-        self.assert_eq((pser + 1).astype(float), psser + 1)
-        self.assert_eq((pser + 0.1).astype(float), psser + 0.1)
+        self.check_extension(pser + 1, psser + 1)
+        if extension_float_dtypes_available:
+            self.check_extension(pser + 0.1, psser + 0.1)
+        else:
+            self.assert_eq(pser + 0.1, psser + 0.1)
 
         # In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark
         self.check_extension(ps.Series([True, True, True], dtype="boolean"), psser + True)
@@ -420,8 +426,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
     def test_sub(self):
         pser = self.pser
         psser = self.psser
-        self.assert_eq((pser - 1).astype(float), psser - 1)
-        self.assert_eq((pser - 0.1).astype(float), psser - 0.1)
+        self.check_extension(pser - 1, psser - 1)
+        if extension_float_dtypes_available:
+            self.check_extension(pser - 0.1, psser - 0.1)
+        else:
+            self.assert_eq(pser - 0.1, psser - 0.1)
         self.assertRaises(TypeError, lambda: psser - psser)
         self.assertRaises(TypeError, lambda: psser - True)
 
@@ -434,8 +443,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
     def test_mul(self):
         pser = self.pser
         psser = self.psser
-        self.assert_eq((pser * 1).astype(float), psser * 1)
-        self.assert_eq((pser * 0.1).astype(float), psser * 0.1)
+        self.check_extension(pser * 1, psser * 1)
+        if extension_float_dtypes_available:
+            self.check_extension(pser * 0.1, psser * 0.1)
+        else:
+            self.assert_eq(pser * 0.1, psser * 0.1)
 
         # In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark
         self.check_extension(pser * True, psser * True)
@@ -455,8 +467,12 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
     def test_truediv(self):
         pser = self.pser
         psser = self.psser
-        self.assert_eq((pser / 1).astype(float), psser / 1)
-        self.assert_eq((pser / 0.1).astype(float), psser / 0.1)
+        if extension_float_dtypes_available:
+            self.check_extension(pser / 1, psser / 1)
+            self.check_extension(pser / 0.1, psser / 0.1)
+        else:
+            self.assert_eq(pser / 1, psser / 1)
+            self.assert_eq(pser / 0.1, psser / 0.1)
         self.assertRaises(TypeError, lambda: psser / psser)
         self.assertRaises(TypeError, lambda: psser / True)
 
@@ -474,7 +490,10 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         psser = self.psser
 
         # float is always returned in pandas-on-Spark
-        self.assert_eq((pser // 1).astype("float"), psser // 1)
+        if extension_float_dtypes_available:
+            self.check_extension((pser // 1).astype("Float64"), psser // 1)
+        else:
+            self.assert_eq((pser // 1).astype("float"), psser // 1)
 
         # in pandas, 1 // 0.1 = 9.0; in pandas-on-Spark, 1 // 0.1 = 10.0
         # self.assert_eq(pser // 0.1, psser // 0.1)
@@ -494,8 +513,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
     def test_mod(self):
         pser = self.pser
         psser = self.psser
-        self.assert_eq((pser % 1).astype(float), psser % 1)
-        self.assert_eq((pser % 0.1).astype(float), psser % 0.1)
+        self.check_extension(pser % 1, psser % 1)
+        if extension_float_dtypes_available:
+            self.check_extension(pser % 0.1, psser % 0.1)
+        else:
+            self.assert_eq(pser % 0.1, psser % 0.1)
         self.assertRaises(TypeError, lambda: psser % psser)
         self.assertRaises(TypeError, lambda: psser % True)
 
@@ -509,9 +531,18 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         pser = self.pser
         psser = self.psser
         # float is always returned in pandas-on-Spark
-        self.assert_eq((pser ** 1).astype("float"), psser ** 1)
-        self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1)
-        self.assert_eq((pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float))
+        if extension_float_dtypes_available:
+            self.check_extension((pser ** 1).astype("Float64"), psser ** 1)
+            self.check_extension((pser ** 0.1).astype("Float64"), self.psser ** 0.1)
+            self.check_extension(
+                (pser ** pser.astype(float)).astype("Float64"), psser ** psser.astype(float)
+            )
+        else:
+            self.assert_eq((pser ** 1).astype("float"), psser ** 1)
+            self.assert_eq((pser ** 0.1).astype("float"), self.psser ** 0.1)
+            self.assert_eq(
+                (pser ** pser.astype(float)).astype("float"), psser ** psser.astype(float)
+            )
         self.assertRaises(TypeError, lambda: psser ** psser)
         self.assertRaises(TypeError, lambda: psser ** True)
 
@@ -526,8 +557,11 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
                 self.assertRaises(TypeError, lambda: self.psser ** psser)
 
     def test_radd(self):
-        self.assert_eq((1 + self.pser).astype(float), 1 + self.psser)
-        self.assert_eq((0.1 + self.pser).astype(float), 0.1 + self.psser)
+        self.check_extension(1 + self.pser, 1 + self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(0.1 + self.pser, 0.1 + self.psser)
+        else:
+            self.assert_eq(0.1 + self.pser, 0.1 + self.psser)
         self.assertRaises(TypeError, lambda: "x" + self.psser)
 
         # In pandas, NA | True is NA, whereas NA | True is True in pandas-on-Spark
@@ -538,16 +572,22 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + self.psser)
 
     def test_rsub(self):
-        self.assert_eq((1 - self.pser).astype(float), 1 - self.psser)
-        self.assert_eq((0.1 - self.pser).astype(float), 0.1 - self.psser)
+        self.check_extension(1 - self.pser, 1 - self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(0.1 - self.pser, 0.1 - self.psser)
+        else:
+            self.assert_eq(0.1 - self.pser, 0.1 - self.psser)
         self.assertRaises(TypeError, lambda: "x" - self.psser)
         self.assertRaises(TypeError, lambda: True - self.psser)
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - self.psser)
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - self.psser)
 
     def test_rmul(self):
-        self.assert_eq((1 * self.pser).astype(float), 1 * self.psser)
-        self.assert_eq((0.1 * self.pser).astype(float), 0.1 * self.psser)
+        self.check_extension(1 * self.pser, 1 * self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(0.1 * self.pser, 0.1 * self.psser)
+        else:
+            self.assert_eq(0.1 * self.pser, 0.1 * self.psser)
         self.assertRaises(TypeError, lambda: "x" * self.psser)
 
         # In pandas, NA & False is NA, whereas NA & False is False in pandas-on-Spark
@@ -558,35 +598,49 @@ class BooleanExtensionOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) * self.psser)
 
     def test_rtruediv(self):
-        self.assert_eq((1 / self.pser).astype(float), 1 / self.psser)
-        self.assert_eq((0.1 / self.pser).astype(float), 0.1 / self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(1 / self.pser, 1 / self.psser)
+            self.check_extension(0.1 / self.pser, 0.1 / self.psser)
+        else:
+            self.assert_eq(1 / self.pser, 1 / self.psser)
+            self.assert_eq(0.1 / self.pser, 0.1 / self.psser)
         self.assertRaises(TypeError, lambda: "x" / self.psser)
         self.assertRaises(TypeError, lambda: True / self.psser)
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) / self.psser)
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) / self.psser)
 
     def test_rfloordiv(self):
-        self.assert_eq((1 // self.psser).astype(float), ps.Series([1.0, np.inf, np.nan]))
-        self.assert_eq((0.1 // self.psser).astype(float), ps.Series([0.0, np.inf, np.nan]))
+        self.assert_eq(pd.Series([1.0, np.inf, np.nan]), (1 // self.psser).astype(float))
+        self.assert_eq(pd.Series([0.0, np.inf, np.nan]), (0.1 // self.psser).astype(float))
         self.assertRaises(TypeError, lambda: "x" // self.psser)
         self.assertRaises(TypeError, lambda: True // self.psser)
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) // self.psser)
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) // self.psser)
 
     def test_rpow(self):
-        self.assert_eq(1 ** self.psser, ps.Series([1, 1, 1], dtype=float))
-        self.assert_eq((0.1 ** self.pser).astype(float), 0.1 ** self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(pd.Series([1, 1, 1], dtype="Float64"), 1 ** self.psser)
+            self.check_extension((0.1 ** self.pser).astype("Float64"), 0.1 ** self.psser)
+        else:
+            self.assert_eq(pd.Series([1, 1, 1], dtype="float"), 1 ** self.psser)
+            self.assert_eq((0.1 ** self.pser).astype("float"), 0.1 ** self.psser)
         self.assertRaises(TypeError, lambda: "x" ** self.psser)
         self.assertRaises(TypeError, lambda: True ** self.psser)
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** self.psser)
         self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** self.psser)
 
     def test_rmod(self):
-        self.assert_eq(ps.Series([0, np.nan, np.nan], dtype=float), 1 % self.psser)
-        self.assert_eq(
-            ps.Series([0.10000000000000009, np.nan, np.nan], dtype=float),
-            0.1 % self.psser,
-        )
+        self.check_extension(ps.Series([0, np.nan, np.nan], dtype="Int64"), 1 % self.psser)
+        if extension_float_dtypes_available:
+            self.check_extension(
+                pd.Series([0.10000000000000009, np.nan, np.nan], dtype="Float64"),
+                0.1 % self.psser,
+            )
+        else:
+            self.assert_eq(
+                pd.Series([0.10000000000000009, np.nan, np.nan], dtype="float"),
+                0.1 % self.psser,
+            )
         self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) % self.psser)
         self.assertRaises(TypeError, lambda: True % self.psser)
 
diff --git a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
index edfe806..1574ebf 100644
--- a/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
+++ b/python/pyspark/pandas/tests/data_type_ops/test_date_ops.py
@@ -190,7 +190,7 @@ class DateOpsTest(PandasOnSparkTestCase, TestCasesUtils):
         pser = self.pser
         psser = self.psser
         self.assert_eq(pser.astype(str), psser.astype(str))
-        self.assert_eq(pd.Series([None, None, None]), psser.astype(bool))
+        self.assert_eq(pser.astype(bool), psser.astype(bool))
         cat_type = CategoricalDtype(categories=["a", "b", "c"])
         self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
 
diff --git a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
index fc843c4..4bda305 100644
--- a/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
+++ b/python/pyspark/pandas/tests/data_type_ops/testing_utils.py
@@ -164,7 +164,7 @@ class TestCasesUtils(object):
             + self.integral_extension_dtypes
         )
 
-    def check_extension(self, psser, pser):
+    def check_extension(self, left, right):
         """
         Compare `psser` and `pser` of numeric ExtensionDtypes.
 
@@ -172,7 +172,8 @@ class TestCasesUtils(object):
         pandas versions. Please refer to https://github.com/pandas-dev/pandas/issues/39410.
         """
         if LooseVersion("1.1") <= LooseVersion(pd.__version__) < LooseVersion("1.2.2"):
-            self.assert_eq(psser, pser, check_exact=False)
-            self.assertTrue(isinstance(psser.dtype, extension_dtypes))
+            self.assert_eq(left, right, check_exact=False)
+            self.assertTrue(isinstance(left.dtype, extension_dtypes))
+            self.assertTrue(isinstance(right.dtype, extension_dtypes))
         else:
-            self.assert_eq(psser, pser)
+            self.assert_eq(left, right)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org