You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/12 11:49:46 UTC

spark git commit: [SPARK-23352][PYTHON] Explicitly specify supported types in Pandas UDFs

Repository: spark
Updated Branches:
  refs/heads/master 6efd5d117 -> c338c8cf8


[SPARK-23352][PYTHON] Explicitly specify supported types in Pandas UDFs

## What changes were proposed in this pull request?

This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.

1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:

    ```python
    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ```
    ```
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    ```

    We can document this behaviour for its guide.

2. Also, the grouped aggregate Pandas UDF fails fast on `ArrayType` but seems we can support this case.

    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG)
    df = spark.range(100).selectExpr("id", "array(id) as value")
    df.groupBy("id").agg(foo("value")).show()
    ```

    ```
    ...
     NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG
    ```

3. Since we can check the return type ahead, we can fail fast before actual execution.

    ```python
    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())
    ```

## How was this patch tested?

Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.

Author: hyukjinkwon <gu...@gmail.com>

Closes #20531 from HyukjinKwon/pudf-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c338c8cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c338c8cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c338c8cf

Branch: refs/heads/master
Commit: c338c8cf8253c037ecd4f39bbd58ed5a86581b37
Parents: 6efd5d1
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Feb 12 20:49:36 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Mon Feb 12 20:49:36 2018 +0900

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   4 +-
 python/pyspark/sql/tests.py                     | 130 +++++++++++--------
 python/pyspark/sql/types.py                     |   4 +
 python/pyspark/sql/udf.py                       |  36 +++--
 python/pyspark/worker.py                        |   2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   2 +-
 6 files changed, 111 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index eab4030..6174a93 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1676,7 +1676,7 @@ Using the above optimizations with Arrow will produce the same results as when A
 enabled. Note that even with Arrow, `toPandas()` results in the collection of all records in the
 DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
 data types are currently supported and an error can be raised if a column has an unsupported type,
-see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
+see [Supported SQL Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
 Spark will fall back to create the DataFrame without Arrow.
 
 ## Pandas UDFs (a.k.a. Vectorized UDFs)
@@ -1734,7 +1734,7 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
 
 ### Supported SQL Types
 
-Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
+Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`,
 `ArrayType` of `TimestampType`, and nested `StructType`.
 
 ### Setting Arrow Batch Size

http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fe89bd0..2af218a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3790,10 +3790,10 @@ class PandasUDFTests(ReusedSQLTestCase):
         self.assertEqual(foo.returnType, schema)
         self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        @pandas_udf(returnType='v double', functionType=PandasUDFType.SCALAR)
+        @pandas_udf(returnType='double', functionType=PandasUDFType.SCALAR)
         def foo(x):
             return x
-        self.assertEqual(foo.returnType, schema)
+        self.assertEqual(foo.returnType, DoubleType())
         self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
@@ -3830,7 +3830,7 @@ class PandasUDFTests(ReusedSQLTestCase):
                 @pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
-            with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
+            with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
                 @pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
@@ -3879,7 +3879,7 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         return random_udf
 
     def test_vectorized_udf_basic(self):
-        from pyspark.sql.functions import pandas_udf, col
+        from pyspark.sql.functions import pandas_udf, col, array
         df = self.spark.range(10).select(
             col('id').cast('string').alias('str'),
             col('id').cast('int').alias('int'),
@@ -3887,7 +3887,8 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
             col('id').cast('float').alias('float'),
             col('id').cast('double').alias('double'),
             col('id').cast('decimal').alias('decimal'),
-            col('id').cast('boolean').alias('bool'))
+            col('id').cast('boolean').alias('bool'),
+            array(col('id')).alias('array_long'))
         f = lambda x: x
         str_f = pandas_udf(f, StringType())
         int_f = pandas_udf(f, IntegerType())
@@ -3896,10 +3897,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         double_f = pandas_udf(f, DoubleType())
         decimal_f = pandas_udf(f, DecimalType())
         bool_f = pandas_udf(f, BooleanType())
+        array_long_f = pandas_udf(f, ArrayType(LongType()))
         res = df.select(str_f(col('str')), int_f(col('int')),
                         long_f(col('long')), float_f(col('float')),
                         double_f(col('double')), decimal_f('decimal'),
-                        bool_f(col('bool')))
+                        bool_f(col('bool')), array_long_f('array_long'))
         self.assertEquals(df.collect(), res.collect())
 
     def test_register_nondeterministic_vectorized_udf_basic(self):
@@ -4104,10 +4106,11 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
     def test_vectorized_udf_wrong_return_type(self):
         from pyspark.sql.functions import pandas_udf, col
         df = self.spark.range(10)
-        f = pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Unsupported.*type.*conversion'):
-                df.select(f(col('id'))).collect()
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*scalar Pandas UDF.*MapType'):
+                pandas_udf(lambda x: x * 1.0, MapType(LongType(), LongType()))
 
     def test_vectorized_udf_return_scalar(self):
         from pyspark.sql.functions import pandas_udf, col
@@ -4142,13 +4145,18 @@ class ScalarPandasUDFTests(ReusedSQLTestCase):
         self.assertEquals(df.collect(), res.collect())
 
     def test_vectorized_udf_unsupported_types(self):
-        from pyspark.sql.functions import pandas_udf, col
-        schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)])
-        df = self.spark.createDataFrame([(None,)], schema=schema)
-        f = pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
+        from pyspark.sql.functions import pandas_udf
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
-                df.select(f(col('map'))).collect()
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*scalar Pandas UDF.*MapType'):
+                pandas_udf(lambda x: x, MapType(StringType(), IntegerType()))
+
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*scalar Pandas UDF.*BinaryType'):
+                pandas_udf(lambda x: x, BinaryType())
 
     def test_vectorized_udf_dates(self):
         from pyspark.sql.functions import pandas_udf, col
@@ -4379,15 +4387,16 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
             .withColumn("vs", array([lit(i) for i in range(20, 30)])) \
             .withColumn("v", explode(col('vs'))).drop('vs')
 
-    def test_simple(self):
-        from pyspark.sql.functions import pandas_udf, PandasUDFType
-        df = self.data
+    def test_supported_types(self):
+        from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
+        df = self.data.withColumn("arr", array(col("id")))
 
         foo_udf = pandas_udf(
             lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
             StructType(
                 [StructField('id', LongType()),
                  StructField('v', IntegerType()),
+                 StructField('arr', ArrayType(LongType())),
                  StructField('v1', DoubleType()),
                  StructField('v2', LongType())]),
             PandasUDFType.GROUPED_MAP
@@ -4490,17 +4499,15 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
     def test_wrong_return_type(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
-        df = self.data
-
-        foo = pandas_udf(
-            lambda pdf: pdf,
-            'id long, v map<int, int>',
-            PandasUDFType.GROUPED_MAP
-        )
 
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Unsupported.*type.*conversion'):
-                df.groupby('id').apply(foo).sort('id').toPandas()
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*grouped map Pandas UDF.*MapType'):
+                pandas_udf(
+                    lambda pdf: pdf,
+                    'id long, v map<int, int>',
+                    PandasUDFType.GROUPED_MAP)
 
     def test_wrong_args(self):
         from pyspark.sql.functions import udf, pandas_udf, sum, PandasUDFType
@@ -4519,23 +4526,30 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
                 df.groupby('id').apply(
                     pandas_udf(lambda: 1, StructType([StructField("d", DoubleType())])))
             with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
-                df.groupby('id').apply(
-                    pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())])))
+                df.groupby('id').apply(pandas_udf(lambda x, y: x, DoubleType()))
             with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
                 df.groupby('id').apply(
-                    pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())]),
-                               PandasUDFType.SCALAR))
+                    pandas_udf(lambda x, y: x, DoubleType(), PandasUDFType.SCALAR))
 
     def test_unsupported_types(self):
-        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
+        from pyspark.sql.functions import pandas_udf, PandasUDFType
         schema = StructType(
             [StructField("id", LongType(), True),
              StructField("map", MapType(StringType(), IntegerType()), True)])
-        df = self.spark.createDataFrame([(1, None,)], schema=schema)
-        f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
-            with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
-                df.groupby('id').apply(f).collect()
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*grouped map Pandas UDF.*MapType'):
+                pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
+
+        schema = StructType(
+            [StructField("id", LongType(), True),
+             StructField("arr_ts", ArrayType(TimestampType()), True)])
+        with QuietTest(self.sc):
+            with self.assertRaisesRegexp(
+                    NotImplementedError,
+                    'Invalid returnType.*grouped map Pandas UDF.*ArrayType.*TimestampType'):
+                pandas_udf(lambda x: x, schema, PandasUDFType.GROUPED_MAP)
 
     # Regression test for SPARK-23314
     def test_timestamp_dst(self):
@@ -4614,23 +4628,32 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         return weighted_mean
 
     def test_manual(self):
+        from pyspark.sql.functions import pandas_udf, array
+
         df = self.data
         sum_udf = self.pandas_agg_sum_udf
         mean_udf = self.pandas_agg_mean_udf
-
-        result1 = df.groupby('id').agg(sum_udf(df.v), mean_udf(df.v)).sort('id')
+        mean_arr_udf = pandas_udf(
+            self.pandas_agg_mean_udf.func,
+            ArrayType(self.pandas_agg_mean_udf.returnType),
+            self.pandas_agg_mean_udf.evalType)
+
+        result1 = df.groupby('id').agg(
+            sum_udf(df.v),
+            mean_udf(df.v),
+            mean_arr_udf(array(df.v))).sort('id')
         expected1 = self.spark.createDataFrame(
-            [[0, 245.0, 24.5],
-             [1, 255.0, 25.5],
-             [2, 265.0, 26.5],
-             [3, 275.0, 27.5],
-             [4, 285.0, 28.5],
-             [5, 295.0, 29.5],
-             [6, 305.0, 30.5],
-             [7, 315.0, 31.5],
-             [8, 325.0, 32.5],
-             [9, 335.0, 33.5]],
-            ['id', 'sum(v)', 'avg(v)'])
+            [[0, 245.0, 24.5, [24.5]],
+             [1, 255.0, 25.5, [25.5]],
+             [2, 265.0, 26.5, [26.5]],
+             [3, 275.0, 27.5, [27.5]],
+             [4, 285.0, 28.5, [28.5]],
+             [5, 295.0, 29.5, [29.5]],
+             [6, 305.0, 30.5, [30.5]],
+             [7, 315.0, 31.5, [31.5]],
+             [8, 325.0, 32.5, [32.5]],
+             [9, 335.0, 33.5, [33.5]]],
+            ['id', 'sum(v)', 'avg(v)', 'avg(array(v))'])
 
         self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
 
@@ -4667,14 +4690,15 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         self.assertPandasEqual(expected4.toPandas(), result4.toPandas())
 
     def test_unsupported_types(self):
-        from pyspark.sql.types import ArrayType, DoubleType, MapType
+        from pyspark.sql.types import DoubleType, MapType
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
-                @pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG)
-                def mean_and_std_udf(v):
-                    return [v.mean(), v.std()]
+                pandas_udf(
+                    lambda x: x,
+                    ArrayType(ArrayType(TimestampType())),
+                    PandasUDFType.GROUPED_AGG)
 
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(NotImplementedError, 'not supported'):

http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/python/pyspark/sql/types.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index f7141b4..e25941c 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1638,6 +1638,8 @@ def to_arrow_type(dt):
         # Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read
         arrow_type = pa.timestamp('us', tz='UTC')
     elif type(dt) == ArrayType:
+        if type(dt.elementType) == TimestampType:
+            raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
         arrow_type = pa.list_(to_arrow_type(dt.elementType))
     else:
         raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
@@ -1680,6 +1682,8 @@ def from_arrow_type(at):
     elif types.is_timestamp(at):
         spark_type = TimestampType()
     elif types.is_list(at):
+        if types.is_timestamp(at.value_type):
+            raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
         spark_type = ArrayType(from_arrow_type(at.value_type))
     else:
         raise TypeError("Unsupported type in conversion from Arrow: " + str(at))

http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 08c6b9e..e5b35fc 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -23,7 +23,7 @@ from pyspark import SparkContext, since
 from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType, ignore_unicode_prefix
 from pyspark.sql.column import Column, _to_java_column, _to_seq
 from pyspark.sql.types import StringType, DataType, ArrayType, StructType, MapType, \
-    _parse_datatype_string
+    _parse_datatype_string, to_arrow_type, to_arrow_schema
 
 __all__ = ["UDFRegistration"]
 
@@ -112,15 +112,31 @@ class UserDefinedFunction(object):
             else:
                 self._returnType_placeholder = _parse_datatype_string(self._returnType)
 
-        if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
-                and not isinstance(self._returnType_placeholder, StructType):
-            raise ValueError("Invalid returnType: returnType must be a StructType for "
-                             "pandas_udf with function type GROUPED_MAP")
-        elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF \
-                and isinstance(self._returnType_placeholder, (StructType, ArrayType, MapType)):
-            raise NotImplementedError(
-                "ArrayType, StructType and MapType are not supported with "
-                "PandasUDFType.GROUPED_AGG")
+        if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+            try:
+                to_arrow_type(self._returnType_placeholder)
+            except TypeError:
+                raise NotImplementedError(
+                    "Invalid returnType with scalar Pandas UDFs: %s is "
+                    "not supported" % str(self._returnType_placeholder))
+        elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_schema(self._returnType_placeholder)
+                except TypeError:
+                    raise NotImplementedError(
+                        "Invalid returnType with grouped map Pandas UDFs: "
+                        "%s is not supported" % str(self._returnType_placeholder))
+            else:
+                raise TypeError("Invalid returnType for grouped map Pandas "
+                                "UDFs: returnType must be a StructType.")
+        elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
+            try:
+                to_arrow_type(self._returnType_placeholder)
+            except TypeError:
+                raise NotImplementedError(
+                    "Invalid returnType with grouped aggregate Pandas UDFs: "
+                    "%s is not supported" % str(self._returnType_placeholder))
 
         return self._returnType_placeholder
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 121b3dd..89a3a92 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -116,7 +116,7 @@ def wrap_grouped_agg_pandas_udf(f, return_type):
     def wrapped(*series):
         import pandas as pd
         result = f(*series)
-        return pd.Series(result)
+        return pd.Series([result])
 
     return lambda *a: (wrapped(*a), arrow_return_type)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c338c8cf/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1e2501e..7835dba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1064,7 +1064,7 @@ object SQLConf {
         "for use with pyspark.sql.DataFrame.toPandas, and " +
         "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame. " +
         "The following data types are unsupported: " +
-        "MapType, ArrayType of TimestampType, and nested StructType.")
+        "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
       .booleanConf
       .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org