You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/01/30 12:56:01 UTC

spark git commit: [SPARK-23261][PYSPARK] Rename Pandas UDFs

Repository: spark
Updated Branches:
  refs/heads/master 0a9ac0248 -> 7a2ada223


[SPARK-23261][PYSPARK] Rename Pandas UDFs

## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`
- `PANDAS GROUP AGG UDF` -> `GROUPED AGG PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <ga...@gmail.com>

Closes #20428 from gatorsmile/renamePandasUDFs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a2ada22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a2ada22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a2ada22

Branch: refs/heads/master
Commit: 7a2ada223e14d09271a76091be0338b2d375081e
Parents: 0a9ac02
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 30 21:55:55 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Tue Jan 30 21:55:55 2018 +0900

----------------------------------------------------------------------
 .../apache/spark/api/python/PythonRunner.scala  | 12 +--
 docs/sql-programming-guide.md                   |  8 +-
 examples/src/main/python/sql/arrow.py           | 12 +--
 python/pyspark/rdd.py                           |  6 +-
 python/pyspark/sql/functions.py                 | 34 ++++----
 python/pyspark/sql/group.py                     | 10 +--
 python/pyspark/sql/tests.py                     | 92 ++++++++++----------
 python/pyspark/sql/udf.py                       | 25 +++---
 python/pyspark/worker.py                        | 24 ++---
 .../sql/catalyst/expressions/PythonUDF.scala    |  4 +-
 .../spark/sql/catalyst/planning/patterns.scala  |  1 -
 .../spark/sql/RelationalGroupedDataset.scala    |  4 +-
 .../python/AggregateInPandasExec.scala          |  2 +-
 .../execution/python/ArrowEvalPythonExec.scala  |  2 +-
 .../execution/python/ExtractPythonUDFs.scala    |  2 +-
 .../python/FlatMapGroupsInPandasExec.scala      |  2 +-
 16 files changed, 120 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 29148a7..f075a7e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -37,16 +37,16 @@ private[spark] object PythonEvalType {
 
   val SQL_BATCHED_UDF = 100
 
-  val SQL_PANDAS_SCALAR_UDF = 200
-  val SQL_PANDAS_GROUP_MAP_UDF = 201
-  val SQL_PANDAS_GROUP_AGG_UDF = 202
+  val SQL_SCALAR_PANDAS_UDF = 200
+  val SQL_GROUPED_MAP_PANDAS_UDF = 201
+  val SQL_GROUPED_AGG_PANDAS_UDF = 202
 
   def toString(pythonEvalType: Int): String = pythonEvalType match {
     case NON_UDF => "NON_UDF"
     case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
-    case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
-    case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
-    case SQL_PANDAS_GROUP_AGG_UDF => "SQL_PANDAS_GROUP_AGG_UDF"
+    case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
+    case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
+    case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d49c8d8..a0e221b 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1684,7 +1684,7 @@ Spark will fall back to create the DataFrame without Arrow.
 Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
 Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
 or to wrap the function, no additional configuration is required. Currently, there are two types of
-Pandas UDF: Scalar and Group Map.
+Pandas UDF: Scalar and Grouped Map.
 
 ### Scalar
 
@@ -1702,8 +1702,8 @@ The following example shows how to create a scalar Pandas UDF that computes the
 </div>
 </div>
 
-### Group Map
-Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
+### Grouped Map
+Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
 Split-apply-combine consists of three steps:
 * Split the data into groups by using `DataFrame.groupBy`.
 * Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
@@ -1723,7 +1723,7 @@ The following example shows how to use `groupby().apply()` to subtract the mean
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example group_map_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/examples/src/main/python/sql/arrow.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index 6c0028b..4c5aefb 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -86,15 +86,15 @@ def scalar_pandas_udf_example(spark):
     # $example off:scalar_pandas_udf$
 
 
-def group_map_pandas_udf_example(spark):
-    # $example on:group_map_pandas_udf$
+def grouped_map_pandas_udf_example(spark):
+    # $example on:grouped_map_pandas_udf$
     from pyspark.sql.functions import pandas_udf, PandasUDFType
 
     df = spark.createDataFrame(
         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ("id", "v"))
 
-    @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
     def substract_mean(pdf):
         # pdf is a pandas.DataFrame
         v = pdf.v
@@ -110,7 +110,7 @@ def group_map_pandas_udf_example(spark):
     # |  2|-1.0|
     # |  2| 4.0|
     # +---+----+
-    # $example off:group_map_pandas_udf$
+    # $example off:grouped_map_pandas_udf$
 
 
 if __name__ == "__main__":
@@ -123,7 +123,7 @@ if __name__ == "__main__":
     dataframe_with_arrow_example(spark)
     print("Running pandas_udf scalar example")
     scalar_pandas_udf_example(spark)
-    print("Running pandas_udf group map example")
-    group_map_pandas_udf_example(spark)
+    print("Running pandas_udf grouped map example")
+    grouped_map_pandas_udf_example(spark)
 
     spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6b018c3..93b8974 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -68,9 +68,9 @@ class PythonEvalType(object):
 
     SQL_BATCHED_UDF = 100
 
-    SQL_PANDAS_SCALAR_UDF = 200
-    SQL_PANDAS_GROUP_MAP_UDF = 201
-    SQL_PANDAS_GROUP_AGG_UDF = 202
+    SQL_SCALAR_PANDAS_UDF = 200
+    SQL_GROUPED_MAP_PANDAS_UDF = 201
+    SQL_GROUPED_AGG_PANDAS_UDF = 202
 
 
 def portable_hash(x):

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index a291c9b..3c8fb4c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1737,8 +1737,8 @@ def translate(srcCol, matching, replace):
 def create_map(*cols):
     """Creates a new map column.
 
-    :param cols: list of column names (string) or list of :class:`Column` expressions that grouped
-        as key-value pairs, e.g. (key1, value1, key2, value2, ...).
+    :param cols: list of column names (string) or list of :class:`Column` expressions that are
+        grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).
 
     >>> df.select(create_map('name', 'age').alias("map")).collect()
     [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
@@ -2085,11 +2085,11 @@ def map_values(col):
 class PandasUDFType(object):
     """Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
     """
-    SCALAR = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+    SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF
 
-    GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF
+    GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
 
-    GROUP_AGG = PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF
+    GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
 
 
 @since(1.3)
@@ -2193,20 +2193,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            Therefore, this can be used, for example, to ensure the length of each returned
            `pandas.Series`, and can not be used as the column length.
 
-    2. GROUP_MAP
+    2. GROUPED_MAP
 
-       A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
+       A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
        The returnType should be a :class:`StructType` describing the schema of the returned
        `pandas.DataFrame`.
        The length of the returned `pandas.DataFrame` can be arbitrary.
 
-       Group map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
+       Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
 
        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
        >>> df = spark.createDataFrame(
        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
+       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
        ... def normalize(pdf):
        ...     v = pdf.v
        ...     return pdf.assign(v=(v - v.mean()) / v.std())
@@ -2223,9 +2223,9 @@ def pandas_udf(f=None, returnType=None, functionType=None):
 
        .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
 
-    3. GROUP_AGG
+    3. GROUPED_AGG
 
-       A group aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar
+       A grouped aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar
        The `returnType` should be a primitive data type, e.g., :class:`DoubleType`.
        The returned scalar can be either a python primitive type, e.g., `int` or `float`
        or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
@@ -2239,7 +2239,7 @@ def pandas_udf(f=None, returnType=None, functionType=None):
        >>> df = spark.createDataFrame(
        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ...     ("id", "v"))
-       >>> @pandas_udf("double", PandasUDFType.GROUP_AGG)  # doctest: +SKIP
+       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
        ... def mean_udf(v):
        ...     return v.mean()
        >>> df.groupby("id").agg(mean_udf(df['v'])).show()  # doctest: +SKIP
@@ -2285,21 +2285,21 @@ def pandas_udf(f=None, returnType=None, functionType=None):
             eval_type = returnType
         else:
             # @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
-            eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
     else:
         return_type = returnType
 
         if functionType is not None:
             eval_type = functionType
         else:
-            eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
 
     if return_type is None:
         raise ValueError("Invalid returnType: returnType can not be None")
 
-    if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
-                         PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
-                         PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF]:
+    if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                         PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+                         PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]:
         raise ValueError("Invalid functionType: "
                          "functionType must be one the values from PandasUDFType")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/sql/group.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index f90a909..ab64653 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -98,7 +98,7 @@ class GroupedData(object):
         [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
 
         >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-        >>> @pandas_udf('int', PandasUDFType.GROUP_AGG)  # doctest: +SKIP
+        >>> @pandas_udf('int', PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
         ... def min_udf(v):
         ...     return v.min()
         >>> sorted(gdf.agg(min_udf(df.age)).collect())  # doctest: +SKIP
@@ -235,14 +235,14 @@ class GroupedData(object):
             into memory, so the user should be aware of the potential OOM risk if data is skewed
             and certain groups are too large to fit in memory.
 
-        :param udf: a group map user-defined function returned by
+        :param udf: a grouped map user-defined function returned by
             :func:`pyspark.sql.functions.pandas_udf`.
 
         >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
         >>> df = spark.createDataFrame(
         ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ...     ("id", "v"))
-        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
+        >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
         ... def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())
@@ -262,9 +262,9 @@ class GroupedData(object):
         """
         # Columns are special because hasattr always return True
         if isinstance(udf, Column) or not hasattr(udf, 'func') \
-           or udf.evalType != PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+           or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
             raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
-                             "GROUP_MAP.")
+                             "GROUPED_MAP.")
         df = self._df
         udf_column = udf(*[df[col] for col in df.columns])
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index ca7bbf8..dc80870 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3621,34 +3621,34 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         udf = pandas_udf(lambda x: x, DoubleType())
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, 'double', PandasUDFType.SCALAR)
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, StructType([StructField("v", DoubleType())]),
-                         PandasUDFType.GROUP_MAP)
+                         PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
+        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, 'v double',
-                         functionType=PandasUDFType.GROUP_MAP)
+                         functionType=PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, returnType='v double',
-                         functionType=PandasUDFType.GROUP_MAP)
+                         functionType=PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_pandas_udf_decorator(self):
         from pyspark.rdd import PythonEvalType
@@ -3659,45 +3659,45 @@ class PandasUDFTests(ReusedSQLTestCase):
         def foo(x):
             return x
         self.assertEqual(foo.returnType, DoubleType())
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         @pandas_udf(returnType=DoubleType())
         def foo(x):
             return x
         self.assertEqual(foo.returnType, DoubleType())
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         schema = StructType([StructField("v", DoubleType())])
 
-        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
+        @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        @pandas_udf('v double', PandasUDFType.GROUP_MAP)
+        @pandas_udf('v double', PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
+        @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         @pandas_udf(returnType='v double', functionType=PandasUDFType.SCALAR)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
-        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
+        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_udf_wrong_arg(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -3724,15 +3724,15 @@ class PandasUDFTests(ReusedSQLTestCase):
                     return 1
 
             with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
-                @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
             with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
-                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
             with self.assertRaisesRegexp(ValueError, 'Invalid function'):
-                @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUPED_MAP)
                 def foo(k, v):
                     return k
 
@@ -3804,11 +3804,11 @@ class ScalarPandasUDF(ReusedSQLTestCase):
         random_pandas_udf = pandas_udf(
             lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
         self.assertEqual(random_pandas_udf.deterministic, False)
-        self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
             "randomPandasUDF", random_pandas_udf)
         self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
-        self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
         self.assertEqual(row[0], 7)
 
@@ -4206,7 +4206,7 @@ class ScalarPandasUDF(ReusedSQLTestCase):
             col('id').cast('int').alias('b'))
         original_add = pandas_udf(lambda x, y: x + y, IntegerType())
         self.assertEqual(original_add.deterministic, True)
-        self.assertEqual(original_add.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         new_add = self.spark.catalog.registerFunction("add1", original_add)
         res1 = df.select(new_add(col('a'), col('b')))
         res2 = self.spark.sql(
@@ -4237,20 +4237,20 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
                  StructField('v', IntegerType()),
                  StructField('v1', DoubleType()),
                  StructField('v2', LongType())]),
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
         expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
         self.assertPandasEqual(expected, result)
 
-    def test_register_group_map_udf(self):
+    def test_register_grouped_map_udf(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)
+        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or '
-                                                     'SQL_PANDAS_SCALAR_UDF'):
+                                                     'SQL_SCALAR_PANDAS_UDF'):
                 self.spark.catalog.registerFunction("foo_udf", foo_udf)
 
     def test_decorator(self):
@@ -4259,7 +4259,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, v1 double, v2 long',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def foo(pdf):
             return pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id)
@@ -4275,7 +4275,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
         foo = pandas_udf(
             lambda pdf: pdf,
             'id long, v double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo).sort('id').toPandas()
@@ -4289,7 +4289,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, norm double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def normalize(pdf):
             v = pdf.v
@@ -4308,7 +4308,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, norm double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def normalize(pdf):
             v = pdf.v
@@ -4328,7 +4328,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
         foo_udf = pandas_udf(
             lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
             'id long, v int, v1 double, v2 long',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
@@ -4342,7 +4342,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
         foo = pandas_udf(
             lambda pdf: pdf,
             'id long, v map<int, int>',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         with QuietTest(self.sc):
@@ -4368,7 +4368,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
             with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                 df.groupby('id').apply(
                     pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())])))
-            with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUP_MAP'):
+            with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
                 df.groupby('id').apply(
                     pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())]),
                                PandasUDFType.SCALAR))
@@ -4379,7 +4379,7 @@ class GroupbyApplyPandasUDFTests(ReusedSQLTestCase):
             [StructField("id", LongType(), True),
              StructField("map", MapType(StringType(), IntegerType()), True)])
         df = self.spark.createDataFrame([(1, None,)], schema=schema)
-        f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUP_MAP)
+        f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
                 df.groupby('id').apply(f).collect()
@@ -4422,7 +4422,7 @@ class GroupbyAggPandasUDFTests(ReusedSQLTestCase):
     def pandas_agg_mean_udf(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        @pandas_udf('double', PandasUDFType.GROUP_AGG)
+        @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def avg(v):
             return v.mean()
         return avg
@@ -4431,7 +4431,7 @@ class GroupbyAggPandasUDFTests(ReusedSQLTestCase):
     def pandas_agg_sum_udf(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        @pandas_udf('double', PandasUDFType.GROUP_AGG)
+        @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def sum(v):
             return v.sum()
         return sum
@@ -4441,7 +4441,7 @@ class GroupbyAggPandasUDFTests(ReusedSQLTestCase):
         import numpy as np
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        @pandas_udf('double', PandasUDFType.GROUP_AGG)
+        @pandas_udf('double', PandasUDFType.GROUPED_AGG)
         def weighted_mean(v, w):
             return np.average(v, weights=w)
         return weighted_mean
@@ -4505,19 +4505,19 @@ class GroupbyAggPandasUDFTests(ReusedSQLTestCase):
 
         with QuietTest(self.sc):
             with self.assertRaisesRegex(NotImplementedError, 'not supported'):
-                @pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUP_AGG)
+                @pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG)
                 def mean_and_std_udf(v):
                     return [v.mean(), v.std()]
 
         with QuietTest(self.sc):
             with self.assertRaisesRegex(NotImplementedError, 'not supported'):
-                @pandas_udf('mean double, std double', PandasUDFType.GROUP_AGG)
+                @pandas_udf('mean double, std double', PandasUDFType.GROUPED_AGG)
                 def mean_and_std_udf(v):
                     return v.mean(), v.std()
 
         with QuietTest(self.sc):
             with self.assertRaisesRegex(NotImplementedError, 'not supported'):
-                @pandas_udf(MapType(DoubleType(), DoubleType()), PandasUDFType.GROUP_AGG)
+                @pandas_udf(MapType(DoubleType(), DoubleType()), PandasUDFType.GROUPED_AGG)
                 def mean_and_std_udf(v):
                     return {v.mean(): v.std()}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 4f30330..0f759c4 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -37,9 +37,9 @@ def _wrap_function(sc, func, returnType):
 
 def _create_udf(f, returnType, evalType):
 
-    if evalType in (PythonEvalType.SQL_PANDAS_SCALAR_UDF,
-                    PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
-                    PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF):
+    if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                    PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+                    PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
 
         import inspect
         from pyspark.sql.utils import require_minimum_pyarrow_version
@@ -47,16 +47,16 @@ def _create_udf(f, returnType, evalType):
         require_minimum_pyarrow_version()
         argspec = inspect.getargspec(f)
 
-        if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF and len(argspec.args) == 0 and \
+        if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \
                 argspec.varargs is None:
             raise ValueError(
                 "Invalid function: 0-arg pandas_udfs are not supported. "
                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
             )
 
-        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1:
+        if evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF and len(argspec.args) != 1:
             raise ValueError(
-                "Invalid function: pandas_udfs with function type GROUP_MAP "
+                "Invalid function: pandas_udfs with function type GROUPED_MAP "
                 "must take a single arg that is a pandas DataFrame."
             )
 
@@ -112,14 +112,15 @@ class UserDefinedFunction(object):
             else:
                 self._returnType_placeholder = _parse_datatype_string(self._returnType)
 
-        if self.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF \
+        if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
                 and not isinstance(self._returnType_placeholder, StructType):
             raise ValueError("Invalid returnType: returnType must be a StructType for "
-                             "pandas_udf with function type GROUP_MAP")
-        elif self.evalType == PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF \
+                             "pandas_udf with function type GROUPED_MAP")
+        elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF \
                 and isinstance(self._returnType_placeholder, (StructType, ArrayType, MapType)):
             raise NotImplementedError(
-                "ArrayType, StructType and MapType are not supported with PandasUDFType.GROUP_AGG")
+                "ArrayType, StructType and MapType are not supported with "
+                "PandasUDFType.GROUPED_AGG")
 
         return self._returnType_placeholder
 
@@ -292,9 +293,9 @@ class UDFRegistration(object):
                     "Invalid returnType: data type can not be specified when f is"
                     "a user-defined function, but got %s." % returnType)
             if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
-                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
+                                  PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
                 raise ValueError(
-                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
+                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF")
             register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
                                                evalType=f.evalType,
                                                deterministic=f.deterministic)

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 173d8fb..121b3dd 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -74,7 +74,7 @@ def wrap_udf(f, return_type):
         return lambda *a: f(*a)
 
 
-def wrap_pandas_scalar_udf(f, return_type):
+def wrap_scalar_pandas_udf(f, return_type):
     arrow_return_type = to_arrow_type(return_type)
 
     def verify_result_length(*a):
@@ -90,7 +90,7 @@ def wrap_pandas_scalar_udf(f, return_type):
     return lambda *a: (verify_result_length(*a), arrow_return_type)
 
 
-def wrap_pandas_group_map_udf(f, return_type):
+def wrap_grouped_map_pandas_udf(f, return_type):
     def wrapped(*series):
         import pandas as pd
 
@@ -110,7 +110,7 @@ def wrap_pandas_group_map_udf(f, return_type):
     return wrapped
 
 
-def wrap_pandas_group_agg_udf(f, return_type):
+def wrap_grouped_agg_pandas_udf(f, return_type):
     arrow_return_type = to_arrow_type(return_type)
 
     def wrapped(*series):
@@ -133,12 +133,12 @@ def read_single_udf(pickleSer, infile, eval_type):
             row_func = chain(row_func, f)
 
     # the last returnType will be the return type of UDF
-    if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF:
-        return arg_offsets, wrap_pandas_scalar_udf(row_func, return_type)
-    elif eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
-        return arg_offsets, wrap_pandas_group_map_udf(row_func, return_type)
-    elif eval_type == PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF:
-        return arg_offsets, wrap_pandas_group_agg_udf(row_func, return_type)
+    if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+        return arg_offsets, wrap_scalar_pandas_udf(row_func, return_type)
+    elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
+        return arg_offsets, wrap_grouped_map_pandas_udf(row_func, return_type)
+    elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
+        return arg_offsets, wrap_grouped_agg_pandas_udf(row_func, return_type)
     elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
         return arg_offsets, wrap_udf(row_func, return_type)
     else:
@@ -163,9 +163,9 @@ def read_udfs(pickleSer, infile, eval_type):
 
     func = lambda _, it: map(mapper, it)
 
-    if eval_type in (PythonEvalType.SQL_PANDAS_SCALAR_UDF,
-                     PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
-                     PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF):
+    if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                     PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+                     PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
         timezone = utf8_deserializer.loads(infile)
         ser = ArrowStreamPandasSerializer(timezone)
     else:

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index 4ba8ff6..efd664d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.types.DataType
 object PythonUDF {
   private[this] val SCALAR_TYPES = Set(
     PythonEvalType.SQL_BATCHED_UDF,
-    PythonEvalType.SQL_PANDAS_SCALAR_UDF
+    PythonEvalType.SQL_SCALAR_PANDAS_UDF
   )
 
   def isScalarPythonUDF(e: Expression): Boolean = {
@@ -36,7 +36,7 @@ object PythonUDF {
 
   def isGroupAggPandasUDF(e: Expression): Boolean = {
     e.isInstanceOf[PythonUDF] &&
-      e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF
+      e.asInstanceOf[PythonUDF].evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 1322410..626f905 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.planning
 
-import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index d320c1c..7147798 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -449,8 +449,8 @@ class RelationalGroupedDataset protected[sql](
    * workers.
    */
   private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
-    require(expr.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
-      "Must pass a group map udf")
+    require(expr.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+      "Must pass a grouped map udf")
     require(expr.dataType.isInstanceOf[StructType],
       "The returnType of the udf must be a StructType")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 18e5f86..8e01e8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -136,7 +136,7 @@ case class AggregateInPandasExec(
 
       val columnarBatchIter = new ArrowPythonRunner(
         pyFuncs, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF, argOffsets, aggInputSchema,
+        PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, argOffsets, aggInputSchema,
         sessionLocalTimeZone, pandasRespectSessionTimeZone)
         .compute(projectedRowIter, context.partitionId(), context)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 47b146f..c4de214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -81,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
 
     val columnarBatchIter = new ArrowPythonRunner(
         funcs, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_SCALAR_UDF, argOffsets, schema,
+        PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema,
         sessionLocalTimeZone, pandasRespectSessionTimeZone)
       .compute(batchIter, context.partitionId(), context)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 4ae4e16..9d56f48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -160,7 +160,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
           }
 
           val evaluation = validUdfs.partition(
-            _.evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            _.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
           ) match {
             case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty =>
               ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)

http://git-wip-us.apache.org/repos/asf/spark/blob/7a2ada22/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 59db66b..c798fe5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -96,7 +96,7 @@ case class FlatMapGroupsInPandasExec(
 
       val columnarBatchIter = new ArrowPythonRunner(
         chainedFunc, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
+        PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, schema,
         sessionLocalTimeZone, pandasRespectSessionTimeZone)
           .compute(grouped, context.partitionId(), context)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org