You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/05 00:23:02 UTC

[spark] branch master updated: [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c55812  [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
5c55812 is described below

commit 5c55812400e1e0a8aaeb50a50be106e80c916c86
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Fri Jul 5 09:22:41 2019 +0900

    [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type
    
    ## What changes were proposed in this pull request?
    
    This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type .
    
    Had an offline discussion with rxin, mengxr and cloud-fan
    
    The reason is basically:
    
    1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`.
    2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different.
    3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764
    
    Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason.
    
    For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`.
    
    ## How was this patch tested?
    
    Existing tests should cover.
    
    Closes #25044 from HyukjinKwon/SPARK-28198.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/dataframe.py                    | 13 +++++-------
 python/pyspark/sql/functions.py                    |  5 ++++-
 python/pyspark/sql/tests/test_pandas_udf_iter.py   | 24 +++++++++++-----------
 python/pyspark/sql/udf.py                          | 20 +++++++++++++++---
 python/pyspark/worker.py                           |  7 +++++--
 .../plans/logical/pythonLogicalOperators.scala     |  4 ++--
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 10 ++++-----
 .../spark/sql/execution/SparkStrategies.scala      |  4 ++--
 ...onsInPandasExec.scala => MapInPandasExec.scala} |  2 +-
 9 files changed, 52 insertions(+), 37 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3f5d1ff..e666973 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2193,7 +2193,7 @@ class DataFrame(object):
                         _check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
             return pdf
 
-    def mapPartitionsInPandas(self, udf):
+    def mapInPandas(self, udf):
         """
         Maps each partition of the current :class:`DataFrame` using a pandas udf and returns
         the result as a `DataFrame`.
@@ -2215,7 +2215,7 @@ class DataFrame(object):
         ... def filter_func(iterator):
         ...     for pdf in iterator:
         ...         yield pdf[pdf.id == 1]
-        >>> df.mapPartitionsInPandas(filter_func).show()  # doctest: +SKIP
+        >>> df.mapInPandas(filter_func).show()  # doctest: +SKIP
         +---+---+
         | id|age|
         +---+---+
@@ -2227,15 +2227,12 @@ class DataFrame(object):
         """
         # Columns are special because hasattr always return True
         if isinstance(udf, Column) or not hasattr(udf, 'func') \
-                or udf.evalType != PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+                or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
             raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
-                             "SCALAR_ITER.")
-
-        if not isinstance(udf.returnType, StructType):
-            raise ValueError("The returnType of the pandas_udf must be a StructType")
+                             "MAP_ITER.")
 
         udf_column = udf(*[self[col] for col in self.columns])
-        jdf = self._jdf.mapPartitionsInPandas(udf_column._jc.expr())
+        jdf = self._jdf.mapInPandas(udf_column._jc.expr())
         return DataFrame(jdf, self.sql_ctx)
 
     def _collectAsArrow(self):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 5d1e69e..bf33b9a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2802,6 +2802,8 @@ class PandasUDFType(object):
 
     GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
 
+    MAP_ITER = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
+
 
 @since(1.3)
 def udf(f=None, returnType=StringType()):
@@ -3278,7 +3280,8 @@ def pandas_udf(f=None, returnType=None, functionType=None):
     if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
                          PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                          PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
-                         PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
+                         PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+                         PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]:
         raise ValueError("Invalid functionType: "
                          "functionType must be one the values from PandasUDFType")
 
diff --git a/python/pyspark/sql/tests/test_pandas_udf_iter.py b/python/pyspark/sql/tests/test_pandas_udf_iter.py
index c27cc5a..2a5709e 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_iter.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_iter.py
@@ -57,7 +57,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
         ReusedSQLTestCase.tearDownClass()
 
     def test_map_partitions_in_pandas(self):
-        @pandas_udf('id long', PandasUDFType.SCALAR_ITER)
+        @pandas_udf('id long', PandasUDFType.MAP_ITER)
         def func(iterator):
             for pdf in iterator:
                 assert isinstance(pdf, pd.DataFrame)
@@ -65,7 +65,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
                 yield pdf
 
         df = self.spark.range(10)
-        actual = df.mapPartitionsInPandas(func).collect()
+        actual = df.mapInPandas(func).collect()
         expected = df.collect()
         self.assertEquals(actual, expected)
 
@@ -73,45 +73,45 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
         data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")]
         df = self.spark.createDataFrame(data, "a int, b string")
 
-        @pandas_udf(df.schema, PandasUDFType.SCALAR_ITER)
+        @pandas_udf(df.schema, PandasUDFType.MAP_ITER)
         def func(iterator):
             for pdf in iterator:
                 assert isinstance(pdf, pd.DataFrame)
                 assert [d.name for d in list(pdf.dtypes)] == ['int32', 'object']
                 yield pdf
 
-        actual = df.mapPartitionsInPandas(func).collect()
+        actual = df.mapInPandas(func).collect()
         expected = df.collect()
         self.assertEquals(actual, expected)
 
     def test_different_output_length(self):
-        @pandas_udf('a long', PandasUDFType.SCALAR_ITER)
+        @pandas_udf('a long', PandasUDFType.MAP_ITER)
         def func(iterator):
             for _ in iterator:
                 yield pd.DataFrame({'a': list(range(100))})
 
         df = self.spark.range(10)
-        actual = df.repartition(1).mapPartitionsInPandas(func).collect()
+        actual = df.repartition(1).mapInPandas(func).collect()
         self.assertEquals(set((r.a for r in actual)), set(range(100)))
 
     def test_empty_iterator(self):
-        @pandas_udf('a int, b string', PandasUDFType.SCALAR_ITER)
+        @pandas_udf('a int, b string', PandasUDFType.MAP_ITER)
         def empty_iter(_):
             return iter([])
 
         self.assertEqual(
-            self.spark.range(10).mapPartitionsInPandas(empty_iter).count(), 0)
+            self.spark.range(10).mapInPandas(empty_iter).count(), 0)
 
     def test_empty_rows(self):
-        @pandas_udf('a int', PandasUDFType.SCALAR_ITER)
+        @pandas_udf('a int', PandasUDFType.MAP_ITER)
         def empty_rows(_):
             return iter([pd.DataFrame({'a': []})])
 
         self.assertEqual(
-            self.spark.range(10).mapPartitionsInPandas(empty_rows).count(), 0)
+            self.spark.range(10).mapInPandas(empty_rows).count(), 0)
 
     def test_chain_map_partitions_in_pandas(self):
-        @pandas_udf('id long', PandasUDFType.SCALAR_ITER)
+        @pandas_udf('id long', PandasUDFType.MAP_ITER)
         def func(iterator):
             for pdf in iterator:
                 assert isinstance(pdf, pd.DataFrame)
@@ -119,7 +119,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase):
                 yield pdf
 
         df = self.spark.range(10)
-        actual = df.mapPartitionsInPandas(func).mapPartitionsInPandas(func).collect()
+        actual = df.mapInPandas(func).mapInPandas(func).collect()
         expected = df.collect()
         self.assertEquals(actual, expected)
 
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 84be2d2..0944c87 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -42,7 +42,8 @@ def _create_udf(f, returnType, evalType):
     if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
                     PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
                     PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
-                    PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
+                    PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+                    PythonEvalType.SQL_MAP_PANDAS_ITER_UDF):
 
         from pyspark.sql.utils import require_minimum_pyarrow_version
         require_minimum_pyarrow_version()
@@ -135,6 +136,17 @@ class UserDefinedFunction(object):
             else:
                 raise TypeError("Invalid returnType for grouped map Pandas "
                                 "UDFs: returnType must be a StructType.")
+        elif self.evalType == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
+            if isinstance(self._returnType_placeholder, StructType):
+                try:
+                    to_arrow_type(self._returnType_placeholder)
+                except TypeError:
+                    raise NotImplementedError(
+                        "Invalid returnType with map iterator Pandas UDFs: "
+                        "%s is not supported" % str(self._returnType_placeholder))
+            else:
+                raise TypeError("Invalid returnType for map iterator Pandas "
+                                "UDFs: returnType must be a StructType.")
         elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
             try:
                 # StructType is not yet allowed as a return type, explicitly check here to fail fast
@@ -328,10 +340,12 @@ class UDFRegistration(object):
             if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
                                   PythonEvalType.SQL_SCALAR_PANDAS_UDF,
                                   PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
-                                  PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
+                                  PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
+                                  PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]:
                 raise ValueError(
                     "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, "
-                    "SQL_SCALAR_PANDAS_ITER_UDF, or SQL_GROUPED_AGG_PANDAS_UDF")
+                    "SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or "
+                    "SQL_MAP_PANDAS_ITER_UDF.")
             register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
                                                evalType=f.evalType,
                                                deterministic=f.deterministic)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 568902f..b34abd0 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -295,7 +295,10 @@ def read_udfs(pickleSer, infile, eval_type):
     is_map_iter = eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF
 
     if is_scalar_iter or is_map_iter:
-        assert num_udfs == 1, "One SCALAR_ITER UDF expected here."
+        if is_scalar_iter:
+            assert num_udfs == 1, "One SCALAR_ITER UDF expected here."
+        if is_map_iter:
+            assert num_udfs == 1, "One MAP_ITER UDF expected here."
 
         arg_offsets, udf = read_single_udf(
             pickleSer, infile, eval_type, runner_conf, udf_index=0)
@@ -318,7 +321,7 @@ def read_udfs(pickleSer, infile, eval_type):
             for result_batch, result_type in result_iter:
                 num_output_rows += len(result_batch)
                 assert is_map_iter or num_output_rows <= num_input_rows[0], \
-                    "Pandas SCALAR_ITER UDF outputted more rows than input rows."
+                    "Pandas MAP_ITER UDF outputted more rows than input rows."
                 yield (result_batch, result_type)
 
             if is_scalar_iter:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 757e46a..83695e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -41,9 +41,9 @@ case class FlatMapGroupsInPandas(
 
 /**
  * Map partitions using an udf: iter(pandas.Dataframe) -> iter(pandas.DataFrame).
- * This is used by DataFrame.mapPartitionsInPandas()
+ * This is used by DataFrame.mapInPandas()
  */
-case class MapPartitionsInPandas(
+case class MapInPandas(
     functionExpr: Expression,
     output: Seq[Attribute],
     child: LogicalPlan) extends UnaryNode {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fe5b15c..147222c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2651,14 +2651,12 @@ class Dataset[T] private[sql](
    * This function uses Apache Arrow as serialization format between Java executors and Python
    * workers.
    */
-  private[sql] def mapPartitionsInPandas(f: PythonUDF): DataFrame = {
+  private[sql] def mapInPandas(func: PythonUDF): DataFrame = {
     Dataset.ofRows(
       sparkSession,
-      MapPartitionsInPandas(
-        // Here, the evalType is SQL_SCALAR_PANDAS_ITER_UDF since we share the
-        // same Pandas type. To avoid conflicts, it sets SQL_MAP_PANDAS_ITER_UDF here.
-        f.copy(evalType = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF),
-        f.dataType.asInstanceOf[StructType].toAttributes,
+      MapInPandas(
+        func,
+        func.dataType.asInstanceOf[StructType].toAttributes,
         logicalPlan))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ea0c970..c4d5a2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -682,8 +682,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           f, p, b, is, ot, planLater(child)) :: Nil
       case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
         execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
-      case logical.MapPartitionsInPandas(func, output, child) =>
-        execution.python.MapPartitionsInPandasExec(func, output, planLater(child)) :: Nil
+      case logical.MapInPandas(func, output, child) =>
+        execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil
       case logical.MapElements(f, _, _, objAttr, child) =>
         execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
       case logical.AppendColumns(f, _, _, in, out, child) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
similarity index 99%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
index 814366c..2bb8081 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
  * `org.apache.spark.sql.catalyst.plans.logical.MapPartitionsInRWithArrow`
  *
  */
-case class MapPartitionsInPandasExec(
+case class MapInPandasExec(
     func: Expression,
     output: Seq[Attribute],
     child: SparkPlan)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org