You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/13 00:47:32 UTC
spark git commit: [SPARK-23352][PYTHON][BRANCH-2.3] Explicitly
specify supported types in Pandas UDFs
Repository: spark
Updated Branches:
refs/heads/branch-2.3 befb22de8 -> 43f5e4067
[SPARK-23352][PYTHON][BRANCH-2.3] Explicitly specify supported types in Pandas UDFs
## What changes were proposed in this pull request?
This PR backports https://github.com/apache/spark/pull/20531:
It explicitly specifies supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.
1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:
```python
from pyspark.sql.functions import pandas_udf
pudf = pandas_udf(lambda x: x, "binary")
df = spark.createDataFrame([[bytearray(1)]])
df.select(pudf("_1")).show()
```
```
...
TypeError: Unsupported type in conversion to Arrow: BinaryType
```
We can document this behaviour for its guide.
2. Since we can check the return type ahead, we can fail fast before actual execution.
```python
# we can fail fast at this stage because we know the schema ahead
pandas_udf(lambda x: x, BinaryType())
```
## How was this patch tested?
Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.
Author: hyukjinkwon <gu...@gmail.com>
Closes #20588 from HyukjinKwon/PR_TOOL_PICK_PR_20531_BRANCH-2.3.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f5e406
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f5e406
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f5e406
Branch: refs/heads/branch-2.3
Commit: 43f5e40679f771326b2ee72f14cf1ab0ed2ad692
Parents: befb22d
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Feb 12 16:47:28 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 12 16:47:28 2018 -0800
----------------------------------------------------------------------
docs/sql-programming-guide.md | 4 +-
python/pyspark/sql/tests.py | 86 ++++++++++++--------
python/pyspark/sql/types.py | 4 +
python/pyspark/sql/udf.py | 25 ++++--
.../org/apache/spark/sql/internal/SQLConf.scala | 2 +-
5 files changed, 77 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/43f5e406/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index dcef6e5..0f9f01e 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1676,7 +1676,7 @@ Using the above optimizations with Arrow will produce the same results as when A
enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the
DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
data types are currently supported and an error can be raised if a column has an unsupported type,
-see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
+see [Supported SQL Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
Spark will fall back to create the DataFrame without Arrow.
## Pandas UDFs (a.k.a. Vectorized UDFs)
@@ -1734,7 +1734,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
### Supported SQL Types
-Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
+Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`,
`ArrayType` of `TimestampType`, and nested `StructType`.
### Setting Arrow Batch Size
http://git-wip-us.apache.org/repos/asf/spark/blob/43f5e406/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 5480144..904fa7a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3736,10 +3736,10 @@ class PandasUDFTests(ReusedSQLTestCase):
self.assertEqual(foo.returnType, schema)
self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
- @pandas_udf(returnType='v double', functionType=PandasUDFType.SCALAR)
+ @pandas_udf(returnType='double', functionType=PandasUDFType.SCALAR)
def foo(x):
return x
- self.assertEqual(foo.returnType, schema)
+ self.assertEqual(foo.returnType, DoubleType())
self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
@@ -3776,7 +3776,7 @@ class PandasUDFTests(ReusedSQLTestCase):
@pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
- with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
+ with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
@pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
@@ -3825,7 +3825,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
return random_udf
def test_vectorized_udf_basic(self):
- from pyspark.sql.functions import pandas_udf, col
+ from pyspark.sql.functions import pandas_udf, col, array
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
col('id').cast('int').alias('int'),
@@ -3833,7 +3833,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
col('id').cast('float').alias('float'),
col('id').cast('double').alias('double'),
col('id').cast('decimal').alias('decimal'),
- col('id').cast('boolean').alias('bool'))
+ col('id').cast('boolean').alias('bool'),
+ array(col('id')).alias('array_long'))
f = lambda x: x
str_f = pandas_udf(f, StringType())
int_f = pandas_udf(f, IntegerType())
@@ -3842,10 +3843,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
double_f = pandas_udf(f, DoubleType())
decimal_f = pandas_udf(f, DecimalType())
bool_f = pandas_udf(f, BooleanType())
+ array_long_f = pandas_udf(f, ArrayType(LongType()))
res = df.select(str_f(col('str')), int_f(col('int')),
long_f(col('long')), float_f(col('float')),
double_f(col('double')), decimal_f('decimal'),
- bool_f(col('bool')))
+ bool_f(col('bool')), array_long_f('array_long'))
self.assertEquals(df.collect(), res.collect())
def test_register_nondeterministic_vectorized_udf_basic(self):
@@ -4050,10 +4052,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
def test_vectorized_udf_wrong_return_type(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10)
- f = pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, 'Unsupported.*type.*conversion'):
- df.select(f(col('id'))).collect()
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*scalar Pandas UDF.*MapType'):
+ pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
def test_vectorized_udf_return_scalar(self):
from pyspark.sql.functions import pandas_udf, col
@@ -4088,13 +4091,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
self.assertEquals(df.collect(), res.collect())
def test_vectorized_udf_unsupported_types(self):
- from pyspark.sql.functions import pandas_udf, col
- schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
- df = self.spark.createDataFrame([(None,)], schema=schema)
- f = pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
+ from pyspark.sql.functions import pandas_udf
with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
- df.select(f(col('map'))).collect()
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*scalar Pandas UDF.*MapType'):
+ pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
+
+ with QuietTest(self.sc):
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*scalar Pandas UDF.*BinaryType'):
+ pandas_udf(lambda x: x, BinaryType())
def test_vectorized_udf_dates(self):
from pyspark.sql.functions import pandas_udf, col
@@ -4325,15 +4333,16 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))).drop('vs')
- def test_simple(self):
- from pyspark.sql.functions import pandas_udf, PandasUDFType
- df = self.data
+ def test_supported_types(self):
+ from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
+ df = self.data.withColumn("arr", array(col("id")))
foo_udf = pandas_udf(
lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
StructType(
[StructField('id', LongType()),
StructField('v', IntegerType()),
+ StructField('arr', ArrayType(LongType())),
StructField('v1', DoubleType()),
StructField('v2', LongType())]),
PandasUDFType.GROUPED_MAP
@@ -4436,17 +4445,15 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
def test_wrong_return_type(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
- df = self.data
-
- foo = pandas_udf(
- lambda pdf: pdf,
- 'id long, v map<int, int>',
- PandasUDFType.GROUPED_MAP
- )
with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, 'Unsupported.*type.*conversion'):
- df.groupby('id').apply(foo).sort('id').toPandas()
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*grouped map Pandas UDF.*MapType'):
+ pandas_udf(
+ lambda pdf: pdf,
+ 'id long, v map<int, int>',
+ PandasUDFType.GROUPED_MAP)
def test_wrong_args(self):
from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
@@ -4465,23 +4472,30 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
df.groupby('id').apply(
pandas_udf(lambda: 1, StructType([StructField("d", DoubleType())])))
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
- df.groupby('id').apply(
- pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())])))
+ df.groupby('id').apply(pandas_udf(lambda x, y: x, DoubleType()))
with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
df.groupby('id').apply(
- pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())]),
- PandasUDFType.SCALAR))
+ pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
def test_unsupported_types(self):
- from pyspark.sql.functions import pandas_udf, col, PandasUDFType
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
schema = StructType(
[StructField("id", LongType(), True),
StructField("map", MapType(StringType(), IntegerType()), True)])
- df = self.spark.createDataFrame([(1, None,)], schema=schema)
- f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUPED_MAP)
with QuietTest(self.sc):
- with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
- df.groupby('id').apply(f).collect()
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*grouped map Pandas UDF.*MapType'):
+ pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
+
+ schema = StructType(
+ [StructField("id", LongType(), True),
+ StructField("arr_ts", ArrayType(TimestampType()), True)])
+ with QuietTest(self.sc):
+ with self.assertRaisesRegexp(
+ NotImplementedError,
+ 'Invalid returnType.*grouped map Pandas UDF.*ArrayType.*TimestampType'):
+ pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
# Regression test for SPARK-23314
def test_timestamp_dst(self):
http://git-wip-us.apache.org/repos/asf/spark/blob/43f5e406/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index f7141b4..e25941c 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1638,6 +1638,8 @@ def to_arrow_type(dt):
# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read
arrow_type = pa.timestamp('us', tz='UTC')
elif type(dt) == ArrayType:
+ if type(dt.elementType) == TimestampType:
+ raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
arrow_type = pa.list_(to_arrow_type(dt.elementType))
else:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
@@ -1680,6 +1682,8 @@ def from_arrow_type(at):
elif types.is_timestamp(at):
spark_type = TimestampType()
elif types.is_list(at):
+ if types.is_timestamp(at.value_type):
+ raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
spark_type = ArrayType(from_arrow_type(at.value_type))
else:
raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
http://git-wip-us.apache.org/repos/asf/spark/blob/43f5e406/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 5a848c2..2b16b71 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -22,7 +22,8 @@ import functools
from pyspark import SparkContext, since
from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType, ignore_unicode_prefix
from pyspark.sql.column import Column, _to_java_column, _to_seq
-from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string
+from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string, \
+ to_arrow_type, to_arrow_schema
__all__ = ["UDFRegistration"]
@@ -109,10 +110,24 @@ class UserDefinedFunction(object):
else:
self._returnType_placeholder = _parse_datatype_string(self._returnType)
- if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
- and not isinstance(self._returnType_placeholder, StructType):
- raise ValueError("Invalid returnType: returnType must be a StructType for "
- "pandas_udf with function type GROUPED_MAP")
+ if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+ try:
+ to_arrow_type(self._returnType_placeholder)
+ except TypeError:
+ raise NotImplementedError(
+ "Invalid returnType with scalar Pandas UDFs: %s is "
+ "not supported" % str(self._returnType_placeholder))
+ elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
+ if isinstance(self._returnType_placeholder, StructType):
+ try:
+ to_arrow_schema(self._returnType_placeholder)
+ except TypeError:
+ raise NotImplementedError(
+ "Invalid returnType with grouped map Pandas UDFs: "
+ "%s is not supported" % str(self._returnType_placeholder))
+ else:
+ raise TypeError("Invalid returnType for grouped map Pandas "
+ "UDFs: returnType must be a StructType.")
return self._returnType_placeholder
http://git-wip-us.apache.org/repos/asf/spark/blob/43f5e406/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e498f55..54d4f63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1052,7 +1052,7 @@ object SQLConf {
"for use with pyspark.sql.DataFrame.toPandas, and " +
"pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " +
"The following data types are unsupported: " +
- "MapType, ArrayType of TimestampType, and nested StructType.")
+ "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
.booleanConf
.createWithDefault(false)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org