You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by xi...@apache.org on 2023/06/29 21:55:16 UTC
[spark] branch master updated: [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits
This is an automated email from the ASF dual-hosted git repository.
xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e505244460b [SPARK-44150][PYTHON][FOLLOW-UP] Revert commits
e505244460b is described below
commit e505244460baa49f862d36333792c9d924cb4dde
Author: Xinrong Meng <xi...@apache.org>
AuthorDate: Thu Jun 29 14:55:03 2023 -0700
[SPARK-44150][PYTHON][FOLLOW-UP] Revert commits
### What changes were proposed in this pull request?
Revert two commits of [SPARK-44150] that block master CI.
### Why are the changes needed?
N/A
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
N/A
Closes #41799 from xinrong-meng/revert.
Authored-by: Xinrong Meng <xi...@apache.org>
Signed-off-by: Xinrong Meng <xi...@apache.org>
---
python/pyspark/sql/pandas/serializers.py | 32 +++----------------
python/pyspark/sql/tests/test_arrow_python_udf.py | 39 -----------------------
python/pyspark/worker.py | 3 --
3 files changed, 5 insertions(+), 69 deletions(-)
diff --git a/python/pyspark/sql/pandas/serializers.py b/python/pyspark/sql/pandas/serializers.py
index 12d4c3077fe..307fcc33752 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -190,7 +190,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
)
return converter(s)
- def _create_array(self, series, arrow_type, spark_type=None, arrow_cast=False):
+ def _create_array(self, series, arrow_type, spark_type=None):
"""
Create an Arrow Array from the given pandas.Series and optional type.
@@ -202,9 +202,6 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
If None, pyarrow's inferred type will be used
spark_type : DataType, optional
If None, spark type converted from arrow_type will be used
- arrow_cast: bool, optional
- Whether to apply Arrow casting when the user-specified return type mismatches the
- actual return values.
Returns
-------
@@ -229,14 +226,7 @@ class ArrowStreamPandasSerializer(ArrowStreamSerializer):
else:
mask = series.isnull()
try:
- if arrow_cast:
- return pa.Array.from_pandas(series, mask=mask).cast(
- target_type=arrow_type, safe=self._safecheck
- )
- else:
- return pa.Array.from_pandas(
- series, mask=mask, type=arrow_type, safe=self._safecheck
- )
+ return pa.Array.from_pandas(series, mask=mask, type=arrow_type, safe=self._safecheck)
except TypeError as e:
error_msg = (
"Exception thrown when converting pandas.Series (%s) "
@@ -329,14 +319,12 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
df_for_struct=False,
struct_in_pandas="dict",
ndarray_as_list=False,
- arrow_cast=False,
):
super(ArrowStreamPandasUDFSerializer, self).__init__(timezone, safecheck)
self._assign_cols_by_name = assign_cols_by_name
self._df_for_struct = df_for_struct
self._struct_in_pandas = struct_in_pandas
self._ndarray_as_list = ndarray_as_list
- self._arrow_cast = arrow_cast
def arrow_to_pandas(self, arrow_column):
import pyarrow.types as types
@@ -398,13 +386,7 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
# Assign result columns by schema name if user labeled with strings
elif self._assign_cols_by_name and any(isinstance(name, str) for name in s.columns):
arrs_names = [
- (
- self._create_array(
- s[field.name], field.type, arrow_cast=self._arrow_cast
- ),
- field.name,
- )
- for field in t
+ (self._create_array(s[field.name], field.type), field.name) for field in t
]
# Assign result columns by position
else:
@@ -412,11 +394,7 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
# the selected series has name '1', so we rename it to field.name
# as the name is used by _create_array to provide a meaningful error message
(
- self._create_array(
- s[s.columns[i]].rename(field.name),
- field.type,
- arrow_cast=self._arrow_cast,
- ),
+ self._create_array(s[s.columns[i]].rename(field.name), field.type),
field.name,
)
for i, field in enumerate(t)
@@ -425,7 +403,7 @@ class ArrowStreamPandasUDFSerializer(ArrowStreamPandasSerializer):
struct_arrs, struct_names = zip(*arrs_names)
arrs.append(pa.StructArray.from_arrays(struct_arrs, struct_names))
else:
- arrs.append(self._create_array(s, t, arrow_cast=self._arrow_cast))
+ arrs.append(self._create_array(s, t))
return pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in range(len(arrs))])
diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py
index 264ea0b901f..0accb0f3cc1 100644
--- a/python/pyspark/sql/tests/test_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/test_arrow_python_udf.py
@@ -17,8 +17,6 @@
import unittest
-from pyspark.errors import PythonException
-from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.tests.test_udf import BaseUDFTestsMixin
from pyspark.testing.sqlutils import (
@@ -143,43 +141,6 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin):
"[[1, 2], [3, 4]]",
)
- def test_type_coercion_string_to_numeric(self):
- df_int_value = self.spark.createDataFrame(["1", "2"], schema="string")
- df_floating_value = self.spark.createDataFrame(["1.1", "2.2"], schema="string")
-
- int_ddl_types = ["tinyint", "smallint", "int", "bigint"]
- floating_ddl_types = ["double", "float"]
-
- for ddl_type in int_ddl_types:
- # df_int_value
- res = df_int_value.select(udf(lambda x: x, ddl_type)("value").alias("res"))
- self.assertEquals(res.collect(), [Row(res=1), Row(res=2)])
- self.assertEquals(res.dtypes[0][1], ddl_type)
-
- floating_results = [
- [Row(res=1.1), Row(res=2.2)],
- [Row(res=1.100000023841858), Row(res=2.200000047683716)],
- ]
- for ddl_type, floating_res in zip(floating_ddl_types, floating_results):
- # df_int_value
- res = df_int_value.select(udf(lambda x: x, ddl_type)("value").alias("res"))
- self.assertEquals(res.collect(), [Row(res=1.0), Row(res=2.0)])
- self.assertEquals(res.dtypes[0][1], ddl_type)
- # df_floating_value
- res = df_floating_value.select(udf(lambda x: x, ddl_type)("value").alias("res"))
- self.assertEquals(res.collect(), floating_res)
- self.assertEquals(res.dtypes[0][1], ddl_type)
-
- # invalid
- with self.assertRaises(PythonException):
- df_floating_value.select(udf(lambda x: x, "int")("value").alias("res")).collect()
-
- with self.assertRaises(PythonException):
- df_int_value.select(udf(lambda x: x, "decimal")("value").alias("res")).collect()
-
- with self.assertRaises(PythonException):
- df_floating_value.select(udf(lambda x: x, "decimal")("value").alias("res")).collect()
-
class PythonUDFArrowTests(PythonUDFArrowTestsMixin, ReusedSQLTestCase):
@classmethod
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 577286a7357..71a7ccd15aa 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -598,8 +598,6 @@ def read_udfs(pickleSer, infile, eval_type):
"row" if eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF else "dict"
)
ndarray_as_list = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
- # Arrow-optimized Python UDF uses explicit Arrow cast for type coercion
- arrow_cast = eval_type == PythonEvalType.SQL_ARROW_BATCHED_UDF
ser = ArrowStreamPandasUDFSerializer(
timezone,
safecheck,
@@ -607,7 +605,6 @@ def read_udfs(pickleSer, infile, eval_type):
df_for_struct,
struct_in_pandas,
ndarray_as_list,
- arrow_cast,
)
else:
ser = BatchedSerializer(CPickleSerializer(), 100)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org