You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/31 01:08:45 UTC

spark git commit: [SPARK-23261][PYSPARK][BACKPORT-2.3] Rename Pandas UDFs

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f4802dc88 -> 7b9fe0865


[SPARK-23261][PYSPARK][BACKPORT-2.3] Rename Pandas UDFs

This PR is to backport https://github.com/apache/spark/pull/20428 to Spark 2.3 without adding the changes regarding `GROUPED AGG PANDAS UDF`

---

## What changes were proposed in this pull request?
Rename the public APIs and names of pandas udfs.

- `PANDAS SCALAR UDF` -> `SCALAR PANDAS UDF`
- `PANDAS GROUP MAP UDF` -> `GROUPED MAP PANDAS UDF`

## How was this patch tested?
The existing tests

Author: gatorsmile <ga...@gmail.com>

Closes #20439 from gatorsmile/backport2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b9fe086
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b9fe086
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b9fe086

Branch: refs/heads/branch-2.3
Commit: 7b9fe08658b529901a5f22bf81ffdd4410180809
Parents: f4802dc
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Jan 30 17:08:30 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 30 17:08:30 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/api/python/PythonRunner.scala  |  8 +-
 docs/sql-programming-guide.md                   |  8 +-
 examples/src/main/python/sql/arrow.py           | 12 +--
 python/pyspark/rdd.py                           |  4 +-
 python/pyspark/sql/functions.py                 | 24 +++---
 python/pyspark/sql/group.py                     | 10 +--
 python/pyspark/sql/tests.py                     | 80 ++++++++++----------
 python/pyspark/sql/udf.py                       | 18 ++---
 python/pyspark/worker.py                        | 16 ++--
 .../spark/sql/RelationalGroupedDataset.scala    |  4 +-
 .../execution/python/ArrowEvalPythonExec.scala  |  2 +-
 .../execution/python/ExtractPythonUDFs.scala    |  4 +-
 .../python/FlatMapGroupsInPandasExec.scala      |  2 +-
 13 files changed, 96 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 1ec0e71..719ce5b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -37,14 +37,14 @@ private[spark] object PythonEvalType {
 
   val SQL_BATCHED_UDF = 100
 
-  val SQL_PANDAS_SCALAR_UDF = 200
-  val SQL_PANDAS_GROUP_MAP_UDF = 201
+  val SQL_SCALAR_PANDAS_UDF = 200
+  val SQL_GROUPED_MAP_PANDAS_UDF = 201
 
   def toString(pythonEvalType: Int): String = pythonEvalType match {
     case NON_UDF => "NON_UDF"
     case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
-    case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
-    case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
+    case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
+    case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d49c8d8..a0e221b 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1684,7 +1684,7 @@ Spark will fall back to create the DataFrame without Arrow.
 Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
 Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
 or to wrap the function, no additional configuration is required. Currently, there are two types of
-Pandas UDF: Scalar and Group Map.
+Pandas UDF: Scalar and Grouped Map.
 
 ### Scalar
 
@@ -1702,8 +1702,8 @@ The following example shows how to create a scalar Pandas UDF that computes the
 </div>
 </div>
 
-### Group Map
-Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
+### Grouped Map
+Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
 Split-apply-combine consists of three steps:
 * Split the data into groups by using `DataFrame.groupBy`.
 * Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
@@ -1723,7 +1723,7 @@ The following example shows how to use `groupby().apply()` to subtract the mean
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example group_map_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/examples/src/main/python/sql/arrow.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index 6c0028b..4c5aefb 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -86,15 +86,15 @@ def scalar_pandas_udf_example(spark):
     # $example off:scalar_pandas_udf$
 
 
-def group_map_pandas_udf_example(spark):
-    # $example on:group_map_pandas_udf$
+def grouped_map_pandas_udf_example(spark):
+    # $example on:grouped_map_pandas_udf$
     from pyspark.sql.functions import pandas_udf, PandasUDFType
 
     df = spark.createDataFrame(
         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ("id", "v"))
 
-    @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
     def substract_mean(pdf):
         # pdf is a pandas.DataFrame
         v = pdf.v
@@ -110,7 +110,7 @@ def group_map_pandas_udf_example(spark):
     # |  2|-1.0|
     # |  2| 4.0|
     # +---+----+
-    # $example off:group_map_pandas_udf$
+    # $example off:grouped_map_pandas_udf$
 
 
 if __name__ == "__main__":
@@ -123,7 +123,7 @@ if __name__ == "__main__":
     dataframe_with_arrow_example(spark)
     print("Running pandas_udf scalar example")
     scalar_pandas_udf_example(spark)
-    print("Running pandas_udf group map example")
-    group_map_pandas_udf_example(spark)
+    print("Running pandas_udf grouped map example")
+    grouped_map_pandas_udf_example(spark)
 
     spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 340bc3a..9a6010e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -68,8 +68,8 @@ class PythonEvalType(object):
 
     SQL_BATCHED_UDF = 100
 
-    SQL_PANDAS_SCALAR_UDF = 200
-    SQL_PANDAS_GROUP_MAP_UDF = 201
+    SQL_SCALAR_PANDAS_UDF = 200
+    SQL_GROUPED_MAP_PANDAS_UDF = 201
 
 
 def portable_hash(x):

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 961b326..b637707 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1737,8 +1737,8 @@ def translate(srcCol, matching, replace):
 def create_map(*cols):
     """Creates a new map column.
 
-    :param cols: list of column names (string) or list of :class:`Column` expressions that grouped
-        as key-value pairs, e.g. (key1, value1, key2, value2, ...).
+    :param cols: list of column names (string) or list of :class:`Column` expressions that are
+        grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).
 
     >>> df.select(create_map('name', 'age').alias("map")).collect()
     [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
@@ -2085,9 +2085,9 @@ def map_values(col):
 class PandasUDFType(object):
     """Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
     """
-    SCALAR = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+    SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF
 
-    GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF
+    GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
 
 
 @since(1.3)
@@ -2191,20 +2191,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            Therefore, this can be used, for example, to ensure the length of each returned
            `pandas.Series`, and can not be used as the column length.
 
-    2. GROUP_MAP
+    2. GROUPED_MAP
 
-       A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
+       A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
        The returnType should be a :class:`StructType` describing the schema of the returned
        `pandas.DataFrame`.
        The length of the returned `pandas.DataFrame` can be arbitrary.
 
-       Group map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
+       Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
 
        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
        >>> df = spark.createDataFrame(
        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
+       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
        ... def normalize(pdf):
        ...     v = pdf.v
        ...     return pdf.assign(v=(v - v.mean()) / v.std())
@@ -2254,20 +2254,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
             eval_type = returnType
         else:
             # @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
-            eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
     else:
         return_type = returnType
 
         if functionType is not None:
             eval_type = functionType
         else:
-            eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
 
     if return_type is None:
         raise ValueError("Invalid returnType: returnType can not be None")
 
-    if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
-                         PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF]:
+    if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                         PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF]:
         raise ValueError("Invalid functionType: "
                          "functionType must be one the values from PandasUDFType")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/sql/group.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 22061b8..330faf2 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -212,14 +212,14 @@ class GroupedData(object):
         This function does not support partial aggregation, and requires shuffling all the data in
         the :class:`DataFrame`.
 
-        :param udf: a group map user-defined function returned by
-            :meth:`pyspark.sql.functions.pandas_udf`.
+        :param udf: a grouped map user-defined function returned by
+            :func:`pyspark.sql.functions.pandas_udf`.
 
         >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
         >>> df = spark.createDataFrame(
         ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ...     ("id", "v"))
-        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
+        >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
         ... def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())
@@ -239,9 +239,9 @@ class GroupedData(object):
         """
         # Columns are special because hasattr always return True
         if isinstance(udf, Column) or not hasattr(udf, 'func') \
-           or udf.evalType != PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+           or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
             raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
-                             "GROUP_MAP.")
+                             "GROUPED_MAP.")
         df = self._df
         udf_column = udf(*[df[col] for col in df.columns])
         jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a8e337d..0d5bc13 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3621,34 +3621,34 @@ class PandasUDFTests(ReusedSQLTestCase):
 
         udf = pandas_udf(lambda x: x, DoubleType())
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, 'double', PandasUDFType.SCALAR)
         self.assertEqual(udf.returnType, DoubleType())
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, StructType([StructField("v", DoubleType())]),
-                         PandasUDFType.GROUP_MAP)
+                         PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
+        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, 'v double',
-                         functionType=PandasUDFType.GROUP_MAP)
+                         functionType=PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         udf = pandas_udf(lambda x: x, returnType='v double',
-                         functionType=PandasUDFType.GROUP_MAP)
+                         functionType=PandasUDFType.GROUPED_MAP)
         self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
-        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_pandas_udf_decorator(self):
         from pyspark.rdd import PythonEvalType
@@ -3659,45 +3659,45 @@ class PandasUDFTests(ReusedSQLTestCase):
         def foo(x):
             return x
         self.assertEqual(foo.returnType, DoubleType())
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         @pandas_udf(returnType=DoubleType())
         def foo(x):
             return x
         self.assertEqual(foo.returnType, DoubleType())
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
         schema = StructType([StructField("v", DoubleType())])
 
-        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
+        @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        @pandas_udf('v double', PandasUDFType.GROUP_MAP)
+        @pandas_udf('v double', PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
-        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
+        @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
         @pandas_udf(returnType='v double', functionType=PandasUDFType.SCALAR)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
 
-        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
+        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
         def foo(x):
             return x
         self.assertEqual(foo.returnType, schema)
-        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+        self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
 
     def test_udf_wrong_arg(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -3724,15 +3724,15 @@ class PandasUDFTests(ReusedSQLTestCase):
                     return 1
 
             with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
-                @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
             with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
-                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
                 def foo(df):
                     return df
             with self.assertRaisesRegexp(ValueError, 'Invalid function'):
-                @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUP_MAP)
+                @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUPED_MAP)
                 def foo(k, v):
                     return k
 
@@ -3804,11 +3804,11 @@ class VectorizedUDFTests(ReusedSQLTestCase):
         random_pandas_udf = pandas_udf(
             lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
         self.assertEqual(random_pandas_udf.deterministic, False)
-        self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
             "randomPandasUDF", random_pandas_udf)
         self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
-        self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         [row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
         self.assertEqual(row[0], 7)
 
@@ -4206,7 +4206,7 @@ class VectorizedUDFTests(ReusedSQLTestCase):
             col('id').cast('int').alias('b'))
         original_add = pandas_udf(lambda x, y: x + y, IntegerType())
         self.assertEqual(original_add.deterministic, True)
-        self.assertEqual(original_add.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+        self.assertEqual(original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
         new_add = self.spark.catalog.registerFunction("add1", original_add)
         res1 = df.select(new_add(col('a'), col('b')))
         res2 = self.spark.sql(
@@ -4243,20 +4243,20 @@ class GroupbyApplyTests(ReusedSQLTestCase):
                  StructField('v', IntegerType()),
                  StructField('v1', DoubleType()),
                  StructField('v2', LongType())]),
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
         expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
         self.assertFramesEqual(expected, result)
 
-    def test_register_group_map_udf(self):
+    def test_register_grouped_map_udf(self):
         from pyspark.sql.functions import pandas_udf, PandasUDFType
 
-        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)
+        foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or '
-                                                     'SQL_PANDAS_SCALAR_UDF'):
+                                                     'SQL_SCALAR_PANDAS_UDF'):
                 self.spark.catalog.registerFunction("foo_udf", foo_udf)
 
     def test_decorator(self):
@@ -4265,7 +4265,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, v1 double, v2 long',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def foo(pdf):
             return pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id)
@@ -4281,7 +4281,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
         foo = pandas_udf(
             lambda pdf: pdf,
             'id long, v double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo).sort('id').toPandas()
@@ -4295,7 +4295,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, norm double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def normalize(pdf):
             v = pdf.v
@@ -4314,7 +4314,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
 
         @pandas_udf(
             'id long, v int, norm double',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
         def normalize(pdf):
             v = pdf.v
@@ -4334,7 +4334,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
         foo_udf = pandas_udf(
             lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
             'id long, v int, v1 double, v2 long',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
@@ -4348,7 +4348,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
         foo = pandas_udf(
             lambda pdf: pdf,
             'id long, v map<int, int>',
-            PandasUDFType.GROUP_MAP
+            PandasUDFType.GROUPED_MAP
         )
 
         with QuietTest(self.sc):
@@ -4374,7 +4374,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
             with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
                 df.groupby('id').apply(
                     pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())])))
-            with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUP_MAP'):
+            with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
                 df.groupby('id').apply(
                     pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())]),
                                PandasUDFType.SCALAR))
@@ -4385,7 +4385,7 @@ class GroupbyApplyTests(ReusedSQLTestCase):
             [StructField("id", LongType(), True),
              StructField("map", MapType(StringType(), IntegerType()), True)])
         df = self.spark.createDataFrame([(1, None,)], schema=schema)
-        f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUP_MAP)
+        f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUPED_MAP)
         with QuietTest(self.sc):
             with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
                 df.groupby('id').apply(f).collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 0795255..82a28c8 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -36,24 +36,24 @@ def _wrap_function(sc, func, returnType):
 
 def _create_udf(f, returnType, evalType):
 
-    if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF or \
-            evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+    if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                    PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF):
         import inspect
         from pyspark.sql.utils import require_minimum_pyarrow_version
 
         require_minimum_pyarrow_version()
         argspec = inspect.getargspec(f)
 
-        if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF and len(argspec.args) == 0 and \
+        if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \
                 argspec.varargs is None:
             raise ValueError(
                 "Invalid function: 0-arg pandas_udfs are not supported. "
                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
             )
 
-        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1:
+        if evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF and len(argspec.args) != 1:
             raise ValueError(
-                "Invalid function: pandas_udfs with function type GROUP_MAP "
+                "Invalid function: pandas_udfs with function type GROUPED_MAP "
                 "must take a single arg that is a pandas DataFrame."
             )
 
@@ -109,10 +109,10 @@ class UserDefinedFunction(object):
             else:
                 self._returnType_placeholder = _parse_datatype_string(self._returnType)
 
-        if self.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF \
+        if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
                 and not isinstance(self._returnType_placeholder, StructType):
             raise ValueError("Invalid returnType: returnType must be a StructType for "
-                             "pandas_udf with function type GROUP_MAP")
+                             "pandas_udf with function type GROUPED_MAP")
 
         return self._returnType_placeholder
 
@@ -285,9 +285,9 @@ class UDFRegistration(object):
                     "Invalid returnType: data type can not be specified when f is"
                     "a user-defined function, but got %s." % returnType)
             if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
-                                  PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
+                                  PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
                 raise ValueError(
-                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
+                    "Invalid f: f must be either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF")
             register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
                                                evalType=f.evalType,
                                                deterministic=f.deterministic)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index e6737ae..8c41c3d 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -74,7 +74,7 @@ def wrap_udf(f, return_type):
         return lambda *a: f(*a)
 
 
-def wrap_pandas_scalar_udf(f, return_type):
+def wrap_scalar_pandas_udf(f, return_type):
     arrow_return_type = to_arrow_type(return_type)
 
     def verify_result_length(*a):
@@ -90,7 +90,7 @@ def wrap_pandas_scalar_udf(f, return_type):
     return lambda *a: (verify_result_length(*a), arrow_return_type)
 
 
-def wrap_pandas_group_map_udf(f, return_type):
+def wrap_grouped_map_pandas_udf(f, return_type):
     def wrapped(*series):
         import pandas as pd
 
@@ -122,10 +122,10 @@ def read_single_udf(pickleSer, infile, eval_type):
             row_func = chain(row_func, f)
 
     # the last returnType will be the return type of UDF
-    if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF:
-        return arg_offsets, wrap_pandas_scalar_udf(row_func, return_type)
-    elif eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
-        return arg_offsets, wrap_pandas_group_map_udf(row_func, return_type)
+    if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+        return arg_offsets, wrap_scalar_pandas_udf(row_func, return_type)
+    elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
+        return arg_offsets, wrap_grouped_map_pandas_udf(row_func, return_type)
     else:
         return arg_offsets, wrap_udf(row_func, return_type)
 
@@ -148,8 +148,8 @@ def read_udfs(pickleSer, infile, eval_type):
 
     func = lambda _, it: map(mapper, it)
 
-    if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF \
-       or eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+    if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+                     PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF):
         timezone = utf8_deserializer.loads(infile)
         ser = ArrowStreamPandasSerializer(timezone)
     else:

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index a009c00..4e0785a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -450,8 +450,8 @@ class RelationalGroupedDataset protected[sql](
    * workers.
    */
   private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
-    require(expr.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
-      "Must pass a group map udf")
+    require(expr.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+      "Must pass a grouped map udf")
     require(expr.dataType.isInstanceOf[StructType],
       "The returnType of the udf must be a StructType")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 47b146f..c4de214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -81,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
 
     val columnarBatchIter = new ArrowPythonRunner(
         funcs, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_SCALAR_UDF, argOffsets, schema,
+        PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema,
         sessionLocalTimeZone, pandasRespectSessionTimeZone)
       .compute(batchIter, context.partitionId(), context)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index e6616d0..7852152 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -152,7 +152,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
         if (validUdfs.nonEmpty) {
           require(validUdfs.forall(udf =>
             udf.evalType == PythonEvalType.SQL_BATCHED_UDF ||
-            udf.evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            udf.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
           ), "Can only extract scalar vectorized udf or sql batch udf")
 
           val resultAttrs = udfs.zipWithIndex.map { case (u, i) =>
@@ -160,7 +160,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
           }
 
           val evaluation = validUdfs.partition(
-            _.evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF
+            _.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
           ) match {
             case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty =>
               ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)

http://git-wip-us.apache.org/repos/asf/spark/blob/7b9fe086/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 59db66b..c798fe5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -96,7 +96,7 @@ case class FlatMapGroupsInPandasExec(
 
       val columnarBatchIter = new ArrowPythonRunner(
         chainedFunc, bufferSize, reuseWorker,
-        PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
+        PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, schema,
         sessionLocalTimeZone, pandasRespectSessionTimeZone)
           .compute(grouped, context.partitionId(), context)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org