You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by icexelloss <gi...@git.apache.org> on 2017/11/01 16:56:33 UTC

[GitHub] spark pull request #19630: wip: Introduce function type argument in pandas_u...

GitHub user icexelloss opened a pull request:

    https://github.com/apache/spark/pull/19630

    wip: Introduce function type argument in pandas_udf

    ## What changes were proposed in this pull request?
    
    * Add a "function type" argument to pandas_udf.
    * Refactor udf related code from pyspark.sql.functions to pyspark.sql.udf
    * Merge "PythonUdfType" and "PythonEvalType" into a single class "PythonEvalType"
    
    
    ## Design doc
    https://docs.google.com/document/d/1KlLaa-xJ3oz28xlEJqXyCAHU3dwFYkFs_ixcUXrJNTc/edit
    
    ## How was this patch tested?
    
    Added PandasUDFTests
    
    ## TODO:
    * [ ] Implement proper enum type for `PandasUdfType`
    * [ ] Update documentation
    * [ ] Add more tests to PandasUDFTests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/icexelloss/spark spark-22409-pandas-udf-type

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19630.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19630
    
----
commit a78a5e9849b62059c72cf2c87d79a6bee83316ff
Author: Li Jin <ic...@gmail.com>
Date:   2017-11-01T16:50:19Z

    initial commit

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151333891
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,26 +2089,39 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 evalType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           evalType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
         """
         Creates a vectorized user defined function (UDF).
     
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
    +    :param functionType: an enum value in :class:`pyspark.sql.functions.PandasUdfType`.
    +                         Default: SCALAR.
     
    -    The user-defined function can define one of the following transformations:
    +    The function type of the UDF can be one of the following:
     
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    +    1. SCALAR
     
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    +       A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`.
            The returnType should be a primitive data type, e.g., `DoubleType()`.
            The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
     
    +       Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +       :meth:`pyspark.sql.DataFrame.select`.
    +
            >>> from pyspark.sql.types import IntegerType, StringType
            >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
            >>> @pandas_udf(returnType=StringType())
    --- End diff --
    
    In this doctest, there are two pandas_udf. Please explicitly assign `PandasUDFType.SCALAR` as the `functionType` of one of udfs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83853/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83899 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83899/testReport)** for PR 19630 at commit [`3af57d5`](https://github.com/apache/spark/commit/3af57d56a3e13359dae6578179085173924dab3e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148807539
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -214,11 +214,11 @@ def apply(self, udf):
     
             :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
     
    -        >>> from pyspark.sql.functions import pandas_udf
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUdfType
             >>> df = spark.createDataFrame(
             ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
             ...     ("id", "v"))
    -        >>> @pandas_udf(returnType=df.schema)
    +        >>> @pandas_udf(returnType=df.schema, functionType=PandasUdfType.GROUP_FLATMAP)
    --- End diff --
    
    I think `GROUP_MAP` is better here, think about `RDD.mapPartitions`, we pass a function that takes an `Iterator`(group) and returns another `Iterator`(group). `GROUP_TRANSFORM` is also fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151476420
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,26 +2089,39 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 evalType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           evalType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
         """
         Creates a vectorized user defined function (UDF).
     
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
    +    :param functionType: an enum value in :class:`pyspark.sql.functions.PandasUdfType`.
    +                         Default: SCALAR.
     
    -    The user-defined function can define one of the following transformations:
    +    The function type of the UDF can be one of the following:
     
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    +    1. SCALAR
     
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    +       A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`.
            The returnType should be a primitive data type, e.g., `DoubleType()`.
            The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
     
    +       Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +       :meth:`pyspark.sql.DataFrame.select`.
    +
            >>> from pyspark.sql.types import IntegerType, StringType
            >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
            >>> @pandas_udf(returnType=StringType())
    --- End diff --
    
    Added.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    OK, mine was, with this diff:
    
    ```diff
    --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
    +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
    @@ -38,7 +38,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
       // only works on UNIX-based systems now because it uses signals for child management, so we can
       // also fall back to launching workers (pyspark/worker.py) directly.
    -  val useDaemon = !System.getProperty("os.name").startsWith("Windows")
    +  val useDaemon = false
    
       var daemon: Process = null
       val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
    ```
    
    ```bash
    pip install coverage
    # Build Spark (http://spark.apache.org/docs/latest/building-spark.html)
    rm python/lib/pyspark.zip
    rm -fr .coverage
    rm -fr coverage_html
    echo "
    #!/usr/bin/env bash
    coverage run -p \$@
    " > coverage_python
    chmod 755 coverage_python
    PATH=`pwd`:$PATH PYSPARK_PYTHON=coverage_python SPARK_TESTING=1 bin/pyspark pyspark.sql.tests VectorizedUDFTests
    coverage combine
    coverage html -d coverage_html -i
    open coverage_html
    # Open up index.html in your browser.
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148803174
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,16 +2093,26 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 udfType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           udfType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
    --- End diff --
    
    shall we specify the default value `PythonEvalType.PANDAS_SCALAR_UDF` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148804761
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2279,7 +2174,36 @@ def pandas_udf(f=None, returnType=StringType()):
     
         .. note:: The user-defined function must be deterministic.
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +    # decorator @pandas_udf(dataType(), functionType)
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +
    +        return_type = f or returnType
    +
    +        if return_type is None:
    +            raise ValueError("Must specify return type.")
    +
    +        if functionType is not None:
    +            # @pandas_udf(dataType, functionType=functionType)
    +            # @pandas_udf(returnType=dataType, functionType=functionType)
    +            udf_type = functionType
    +        elif returnType is not None and isinstance(returnType, int):
    --- End diff --
    
    I'm a little confused by this branch, do you mean the `returnType` can actually be `udfType` here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83862/testReport)** for PR 19630 at commit [`bdd3e7a`](https://github.com/apache/spark/commit/bdd3e7ab62b158abc301c21f5199be8174af8fb9).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151475971
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,155 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    +"""
    +import functools
    +
    +from pyspark import SparkContext
    +from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
    +from pyspark.sql.column import Column, _to_java_column, _to_seq
    +from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string
    +
    +
    +def _wrap_function(sc, func, returnType):
    +    command = (func, returnType)
    +    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    +    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    +                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    +
    +
    +def _create_udf(f, returnType, evalType):
    +    if evalType == PythonEvalType.PANDAS_SCALAR_UDF:
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) == 0 and argspec.varargs is None:
    +            raise ValueError(
    +                "Invalid function: 0-arg pandas_udfs are not supported. "
    +                "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    +            )
    +
    +    elif evalType == PythonEvalType.PANDAS_GROUP_MAP_UDF:
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) != 1:
    +            raise ValueError(
    +                "Invalid function: pandas_udf with function type GROUP_MAP "
    +                "must take a single arg that is a pandas DataFrame."
    +            )
    +
    +    udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, evalType=evalType)
    --- End diff --
    
    I added the comment, but keep the `name=None` because I think this is more explicit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148806731
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -56,6 +56,22 @@
     __all__ = ["RDD"]
     
     
    +class PythonEvalType(object):
    +    """
    +    Evaluation type of python rdd.
    +
    +    These values are internal to PySpark.
    +    """
    +    NON_UDF = 0
    +
    +    SQL_BATCHED_UDF = 100
    +
    +    PANDAS_SCALAR_UDF = 200
    +    PANDAS_GROUP_MAP_UDF = 201
    +    PANDAS_GROUP_AGGREGATE_UDF = 202
    +    PANDAS_GROUP_FLATMAP_UDF = 203
    --- End diff --
    
    This is new, what's the mean of flatmap here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151393292
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -214,15 +214,15 @@ def apply(self, udf):
     
             :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
     
    -        >>> from pyspark.sql.functions import pandas_udf
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
             >>> df = spark.createDataFrame(
             ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
             ...     ("id", "v"))
    -        >>> @pandas_udf(returnType=df.schema)
    +        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
    --- End diff --
    
    sounds good, but may bring some trouble when trying to figure out which one is data type and which one is function type...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151166483
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2050,7 +2050,6 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -@since(2.3)
    --- End diff --
    
    We have to remove this because with python2.7, you cannot rewrite the docstring for classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151377854
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,88 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    --- End diff --
    
    a little insane but how about `@pandas_udf(returnType=PandasUDFType.GROUP_MAP)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83794/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Thanks @cloud-fan. I will continue to finish the TODO items. Decide the best naming probably needs a bit more careful thinking.
    
    I can remove unimplemented types. However, I think it'd be good if we can include those when thinking about overall naming scheme  of function types.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83906/testReport)** for PR 19630 at commit [`565ba23`](https://github.com/apache/spark/commit/565ba237d41d11d8b73b0a79d39d8b983dcc51f2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151524338
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    --- End diff --
    
    can we try `double`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83699 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83699/testReport)** for PR 19630 at commit [`ee8009d`](https://github.com/apache/spark/commit/ee8009d9a98f65c1c821692e2c7aa05c25baaca9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83956/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151382381
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -214,15 +214,15 @@ def apply(self, udf):
     
             :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
     
    -        >>> from pyspark.sql.functions import pandas_udf
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
             >>> df = spark.createDataFrame(
             ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
             ...     ("id", "v"))
    -        >>> @pandas_udf(returnType=df.schema)
    +        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
    --- End diff --
    
    Just an idea not blocking this PR but I think it might be nicer if we could take a string too for `PandasUDFType.GROUP_MAP` and etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Everyone, I don't have more changes to the PR. I think all comments are addressed at this point. Please let me know if I missed anything or there are more comments. Thank you!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150311184
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,136 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    +"""
    +import functools
    +
    +from pyspark import SparkContext
    +from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
    +from pyspark.sql.column import Column, _to_java_column, _to_seq
    +from pyspark.sql.types import StringType, DataType, _parse_datatype_string
    +
    +
    +def _wrap_function(sc, func, returnType):
    +    command = (func, returnType)
    +    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    +    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    +                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    +
    +
    +def _create_udf(f, *, returnType, udfType):
    +    if udfType in (PythonEvalType.PANDAS_SCALAR_UDF, PythonEvalType.PANDAS_GROUP_FLATMAP_UDF):
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) == 0 and argspec.varargs is None:
    +            raise ValueError(
    +                "0-arg pandas_udfs are not supported. "
    +                "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    +            )
    +    udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, udfType=udfType)
    +    return udf_obj._wrapped()
    +
    +
    +class UserDefinedFunction(object):
    +    """
    +    User defined function in Python
    +
    +    .. versionadded:: 1.3
    +    """
    +    def __init__(self, func,
    +                 returnType=StringType(), name=None,
    +                 udfType=PythonEvalType.SQL_BATCHED_UDF):
    +        if not callable(func):
    +            raise TypeError(
    +                "Not a function or callable (__call__ is not defined): "
    +                "{0}".format(type(func)))
    +
    +        self.func = func
    +        self._returnType = returnType
    +        # Stores UserDefinedPythonFunctions jobj, once initialized
    +        self._returnType_placeholder = None
    +        self._judf_placeholder = None
    +        self._name = name or (
    +            func.__name__ if hasattr(func, '__name__')
    +            else func.__class__.__name__)
    +        self.udfType = udfType
    +
    +
    +    @property
    +    def returnType(self):
    +        # This makes sure this is called after SparkContext is initialized.
    +        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    +        if self._returnType_placeholder is None:
    +            if isinstance(self._returnType, DataType):
    +                self._returnType_placeholder = self._returnType
    +            else:
    +                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    +        return self._returnType_placeholder
    +
    +    @property
    +    def _judf(self):
    +        # It is possible that concurrent access, to newly created UDF,
    +        # will initialize multiple UserDefinedPythonFunctions.
    +        # This is unlikely, doesn't affect correctness,
    +        # and should have a minimal performance impact.
    +        if self._judf_placeholder is None:
    +            self._judf_placeholder = self._create_judf()
    +        return self._judf_placeholder
    +
    +    def _create_judf(self):
    +        from pyspark.sql import SparkSession
    +
    +        spark = SparkSession.builder.getOrCreate()
    +        sc = spark.sparkContext
    +
    +        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    +        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    +        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    +            self._name, wrapped_func, jdt, self.udfType)
    +        return judf
    +
    +    def __call__(self, *cols):
    +        judf = self._judf
    +        sc = SparkContext._active_spark_context
    +        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    +
    +    def _wrapped(self):
    +        """
    +        Wrap this udf with a function and attach docstring from func
    +        """
    +
    +        # It is possible for a callable instance without __name__ attribute or/and
    +        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    +        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    +        # function. So, we take out these attribute names from the default names to set and
    +        # then manually assign it after being wrapped.
    +        assignments = tuple(
    +            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    +
    +        @functools.wraps(self.func, assigned=assignments)
    +        def wrapper(*args):
    +            return self(*args)
    +
    +        wrapper.__name__ = self._name
    +        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    +                              else self.func.__class__.__module__)
    +
    +        wrapper.func = self.func
    +        wrapper.returnType = self.returnType
    +        wrapper.udfType = self.udfType
    +
    +        return wrapper
    --- End diff --
    
    Added


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83699 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83699/testReport)** for PR 19630 at commit [`ee8009d`](https://github.com/apache/spark/commit/ee8009d9a98f65c1c821692e2c7aa05c25baaca9).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151448145
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -56,6 +56,20 @@
     __all__ = ["RDD"]
     
     
    +class PythonEvalType(object):
    +    """
    +    Evaluation type of python rdd.
    +
    +    These values are internal to PySpark.
    --- End diff --
    
    Right. Let me add it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83794 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83794/testReport)** for PR 19630 at commit [`464a4e8`](https://github.com/apache/spark/commit/464a4e899045c7ea022549cb6bf01a2c44d6989a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151605052
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3348,18 +3385,6 @@ def test_vectorized_udf_null_string(self):
             res = df.select(str_f(col('str')))
             self.assertEquals(df.collect(), res.collect())
     
    -    def test_vectorized_udf_zero_parameter(self):
    --- End diff --
    
    Moved to PandasUDFTests


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83942 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83942/testReport)** for PR 19630 at commit [`1f2c47b`](https://github.com/apache/spark/commit/1f2c47b569bcfa3f7ca7f974fee7cbdc21969623).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151493167
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
    +                @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
    +                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
    --- End diff --
    
    This is not an issue of DoubleType vs "double", but rather GROUP_MAP needs to take a returnType that is StructType or a string that can be parsed to a StructType


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83956 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83956/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83698/testReport)** for PR 19630 at commit [`88a6a9a`](https://github.com/apache/spark/commit/88a6a9aa4127f5a5d6dad868e279d993ea7e1274).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148810396
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,136 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    --- End diff --
    
    Yes


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83898/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151448531
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -137,15 +138,18 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
               udf.references.subsetOf(child.outputSet)
             }
             if (validUdfs.nonEmpty) {
    -          if (validUdfs.exists(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF)) {
    -            throw new IllegalArgumentException("Can not use grouped vectorized UDFs")
    -          }
    +          require(validUdfs.forall(udf =>
    +            udf.evalType == PythonEvalType.SQL_BATCHED_UDF ||
    +            udf.evalType == PythonEvalType.PANDAS_SCALAR_UDF
    +          ), "Can only extract scalar vectorized udf or sql batch udf")
    --- End diff --
    
    I think "equal" comparison is better than "not equal", because we might add new types.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83959/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148957066
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -260,19 +259,19 @@ def wrapped(*cols):
                 import pandas as pd
                 result = func(pd.concat(cols, axis=1, keys=columns))
                 if not isinstance(result, pd.DataFrame):
    -                raise TypeError("Return type of the user-defined function should be "
    -                                "Pandas.DataFrame, but is {}".format(type(result)))
    +                raise TypeError('Return type of the user-defined function should be '
    --- End diff --
    
    BTW, I think both `"` and `'` are fine. Let's avoid such changes here unless there is a strong reason for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150662476
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    This is in python built-in library after python3.4. It's not built-in for python2.x, but available through "enum34" package.
    
    There are other ways to implement enum without enum34, but all of them are either complicated (using type()) or not as type-safe (using ints).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83965/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    To be honest, I know a super hacky funny workaround I use sometimes but .. want to know if there is any easy way or cleaner way ..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148811820
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -214,11 +214,11 @@ def apply(self, udf):
     
             :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
     
    -        >>> from pyspark.sql.functions import pandas_udf
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUdfType
             >>> df = spark.createDataFrame(
             ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
             ...     ("id", "v"))
    -        >>> @pandas_udf(returnType=df.schema)
    +        >>> @pandas_udf(returnType=df.schema, functionType=PandasUdfType.GROUP_FLATMAP)
    --- End diff --
    
    I am not tied to the current naming. Let me think a bit more on this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148800791
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -56,6 +56,22 @@
     __all__ = ["RDD"]
     
     
    +class PythonEvalType(object):
    +    """
    +    Evaluation type of python rdd.
    +
    +    These values are internal to PySpark.
    --- End diff --
    
    it's weird to say it's internal and then ask users to use it in public udf API...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148807433
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,16 +2093,26 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 udfType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           udfType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
    --- End diff --
    
    The default value is effectively `PandasUdfType.SCALAR`
    
    This is bit tricky to put default value in the function args because of decorator makes the branching logic too complicated.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83698 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83698/testReport)** for PR 19630 at commit [`88a6a9a`](https://github.com/apache/spark/commit/88a6a9aa4127f5a5d6dad868e279d993ea7e1274).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class PandasUDFType(enum.Enum):`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150571488
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2279,7 +2172,38 @@ def pandas_udf(f=None, returnType=StringType()):
     
         .. note:: The user-defined function must be deterministic.
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +    # decorator @pandas_udf(dataType(), functionType)
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +
    +        return_type = f or returnType
    +
    +        if return_type is None:
    +            raise ValueError("Must specify return type.")
    +
    +        if functionType is not None:
    +            # @pandas_udf(dataType, functionType=functionType)
    +            # @pandas_udf(returnType=dataType, functionType=functionType)
    +            eval_type = functionType.value
    +        elif returnType is not None and isinstance(returnType, PandasUDFType):
    +            # @pandas_udf(dataType, functionType)
    +            eval_type = returnType.value
    +        else:
    +            # @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
    --- End diff --
    
    how about `@pandas_udf(functionType)`? do we throw exception?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150699602
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    @HyukjinKwon We don't have to use enum34 - using it makes the code more a little bit cleaner and more "modern" but it's not absolutely necessary. If we don't have the python2.x backport issue, I'd argue we should use enum34. However, if adding such dependency is not desirable it makes sense to just use class attribute for simplicity.
    
    I can revert to just use int values as enum.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83853 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83853/testReport)** for PR 19630 at commit [`a741a05`](https://github.com/apache/spark/commit/a741a05ddfe4b0ee052ddbe7f3e6bd300e155a36).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83961/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148805057
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,133 +2049,18 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUdfType(object):
    --- End diff --
    
    nit: `PandasUDFType`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83296/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83959/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151381770
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,88 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
    +                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(ValueError, 'Invalid function'):
    +                @pandas_udf(returnType='id int, v double', functionType=PandasUDFType.GROUP_MAP)
    +                def foo(id, v):
    +                    return id
    --- End diff --
    
    Let's avoid shadow the built-in function, `id`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151399050
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -214,15 +214,15 @@ def apply(self, udf):
     
             :param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
     
    -        >>> from pyspark.sql.functions import pandas_udf
    +        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
             >>> df = spark.createDataFrame(
             ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
             ...     ("id", "v"))
    -        >>> @pandas_udf(returnType=df.schema)
    +        >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
    --- End diff --
    
    Yup .. it should be something for a followup anyway even it is somehow possible .. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151335091
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -137,15 +138,18 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
               udf.references.subsetOf(child.outputSet)
             }
             if (validUdfs.nonEmpty) {
    -          if (validUdfs.exists(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF)) {
    -            throw new IllegalArgumentException("Can not use grouped vectorized UDFs")
    -          }
    +          require(validUdfs.forall(udf =>
    +            udf.evalType == PythonEvalType.SQL_BATCHED_UDF ||
    +            udf.evalType == PythonEvalType.PANDAS_SCALAR_UDF
    +          ), "Can only extract scalar vectorized udf or sql batch udf")
    --- End diff --
    
    ```require(validUdfs.forall(_.evalType != PythonEvalType.PANDAS_GROUP_MAP_UDF))```?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Thanks @HyukjinKwon!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19630


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83862/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83959/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    generally LGTM, but need more discussion on the naming.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83899 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83899/testReport)** for PR 19630 at commit [`3af57d5`](https://github.com/apache/spark/commit/3af57d56a3e13359dae6578179085173924dab3e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83898 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83898/testReport)** for PR 19630 at commit [`658f41b`](https://github.com/apache/spark/commit/658f41b9bd2124fb3136c6f4a60352a216a2d0d3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    @HyukjinKwon Thanks for the reply on coverage. It'd be great to have an easy way to run coverage :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150700736
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    I see. Yea, I would like to suggest to use class attributes for now. I don't want to add a dependency here .. We could separately discuss later if you feel strongly about this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151381238
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,155 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    +"""
    +import functools
    +
    +from pyspark import SparkContext
    +from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
    +from pyspark.sql.column import Column, _to_java_column, _to_seq
    +from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string
    +
    +
    +def _wrap_function(sc, func, returnType):
    +    command = (func, returnType)
    +    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    +    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    +                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    +
    +
    +def _create_udf(f, returnType, evalType):
    +    if evalType == PythonEvalType.PANDAS_SCALAR_UDF:
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) == 0 and argspec.varargs is None:
    +            raise ValueError(
    +                "Invalid function: 0-arg pandas_udfs are not supported. "
    +                "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    +            )
    +
    +    elif evalType == PythonEvalType.PANDAS_GROUP_MAP_UDF:
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) != 1:
    +            raise ValueError(
    +                "Invalid function: pandas_udf with function type GROUP_MAP "
    +                "must take a single arg that is a pandas DataFrame."
    +            )
    +
    +    udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, evalType=evalType)
    --- End diff --
    
    Maybe we should better remove `name=None` here and/or leave a small comment here or in L81 saying this name is set from the function.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83942/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151448056
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -34,9 +34,11 @@ import org.apache.spark.util._
      */
     private[spark] object PythonEvalType {
       val NON_UDF = 0
    -  val SQL_BATCHED_UDF = 1
    -  val SQL_PANDAS_UDF = 2
    -  val SQL_PANDAS_GROUPED_UDF = 3
    +
    +  val SQL_BATCHED_UDF = 100
    +
    +  val PANDAS_SCALAR_UDF = 200
    +  val PANDAS_GROUP_MAP_UDF = 201
    --- End diff --
    
    I don't mind changing it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151466298
  
    --- Diff: python/pyspark/worker.py ---
    @@ -89,6 +90,26 @@ def verify_result_length(*a):
         return lambda *a: (verify_result_length(*a), arrow_return_type)
     
     
    +def wrap_pandas_group_map_udf(f, return_type):
    +    def wrapped(*series):
    +        import pandas as pd
    +
    +        result = f(pd.concat(series, axis=1))
    --- End diff --
    
    Oh, I see. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151042155
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2247,16 +2142,20 @@ def pandas_udf(f=None, returnType=StringType()):
            |         8|      JOHN DOE|          22|
            +----------+--------------+------------+
     
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    +    2. GROUP_MAP
     
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    +       A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
            The returnType should be a :class:`StructType` describing the schema of the returned
            `pandas.DataFrame`.
    +       The length of the returned `pandas.DataFrame` can arbitrary.
    --- End diff --
    
    nit: `can arbitrary` -> `can be arbitrary`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148805721
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2279,7 +2174,36 @@ def pandas_udf(f=None, returnType=StringType()):
     
         .. note:: The user-defined function must be deterministic.
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +    # decorator @pandas_udf(dataType(), functionType)
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +
    +        return_type = f or returnType
    +
    +        if return_type is None:
    +            raise ValueError("Must specify return type.")
    +
    +        if functionType is not None:
    +            # @pandas_udf(dataType, functionType=functionType)
    +            # @pandas_udf(returnType=dataType, functionType=functionType)
    +            udf_type = functionType
    +        elif returnType is not None and isinstance(returnType, int):
    --- End diff --
    
    Yes, when using `pandas_udf` as a decorate, the args are actually shifted by one position, i.e, with:
    
    `@pandas_udf('double', SCALAR)`
    it's actually:
    `f='double'` and `returnType=SCALAR`
    
    The most complication of the branching statement is because `pandas_udf` serves as both a decorate and a function


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83794 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83794/testReport)** for PR 19630 at commit [`464a4e8`](https://github.com/apache/spark/commit/464a4e899045c7ea022549cb6bf01a2c44d6989a).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151170727
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2247,16 +2142,20 @@ def pandas_udf(f=None, returnType=StringType()):
            |         8|      JOHN DOE|          22|
            +----------+--------------+------------+
     
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    +    2. GROUP_MAP
     
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    +       A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
            The returnType should be a :class:`StructType` describing the schema of the returned
            `pandas.DataFrame`.
    +       The length of the returned `pandas.DataFrame` can arbitrary.
    --- End diff --
    
    nice catch. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    cc @cloud-fan @ueshin @HyukjinKwon 
    
    I still have some left items to do but I want to get feedback to make sure we are happy with the design. Please take a look, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83956 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83956/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83962/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150695345
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    Oh, I think we should definitely avoid adding a dependency .. I think that `setup.py` should only be applied to pip installation and I believe we need to have a manual dependency control in Python, for example, Py4J if I haven't missed any recent change or anything .. (see `./spark/python/lib`).
    
    I plan to take a closer look related with it soon after we have what to consider as hard dependencies (for example, PyArrow, Pandas and etc).
    
    > What's the common way to implement enum in python?
    
    Up to my knowledge, just use class attributes is common.
    
    > This is in python built-in library after python3.4. It's not built-in for python2.x, but available through "enum34" package.
    >
    > There are other ways to implement enum without enum34, but all of them are either complicated (using type()) or not as type-safe (using ints).
    
    I think here type-safe is not a concern I believe .. do we really need enum here? sounds like a overkill even if it does not need the manual backport for Python 2.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83906/testReport)** for PR 19630 at commit [`565ba23`](https://github.com/apache/spark/commit/565ba237d41d11d8b73b0a79d39d8b983dcc51f2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151448212
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,26 +2089,39 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 evalType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           evalType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
         """
         Creates a vectorized user defined function (UDF).
     
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
    +    :param functionType: an enum value in :class:`pyspark.sql.functions.PandasUdfType`.
    --- End diff --
    
    Good catch. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83906/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151524279
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    --- End diff --
    
    can we also add some test for `SCALAR_MAP` for the error case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Ping @cloud-fan @ueshin @HyukjinKwon any thoughts on this? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83701/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83699/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r148810140
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,136 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    --- End diff --
    
    This is just code move around?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150311258
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,133 +2049,18 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUdfType(object):
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83709/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83862 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83862/testReport)** for PR 19630 at commit [`bdd3e7a`](https://github.com/apache/spark/commit/bdd3e7ab62b158abc301c21f5199be8174af8fb9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151333004
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2208,26 +2089,39 @@ def udf(f=None, returnType=StringType()):
         |         8|      JOHN DOE|          22|
         +----------+--------------+------------+
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)
    +    # decorator @udf, @udf(), @udf(dataType())
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +        return_type = f or returnType
    +        return functools.partial(_create_udf, returnType=return_type,
    +                                 evalType=PythonEvalType.SQL_BATCHED_UDF)
    +    else:
    +        return _create_udf(f=f, returnType=returnType,
    +                           evalType=PythonEvalType.SQL_BATCHED_UDF)
     
     
     @since(2.3)
    -def pandas_udf(f=None, returnType=StringType()):
    +def pandas_udf(f=None, returnType=None, functionType=None):
         """
         Creates a vectorized user defined function (UDF).
     
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
    +    :param functionType: an enum value in :class:`pyspark.sql.functions.PandasUdfType`.
    --- End diff --
    
    `PandasUdfType` -> `PandasUDFType`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83965 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83965/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150696878
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    I can't currently take a closer look right now but please let me know @icexelloss if I missed your intention. Let me take a closer look tonight.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    which, for example, shows the coverage like this:
    
    ![2017-11-17 4 53 16](https://user-images.githubusercontent.com/6477701/32936608-47f1fefa-cbb8-11e7-965d-f19119a8314b.png)
    
    The difficulty is to enable it for workers too ... if anyone does not know the easy way let me share mine .. I am ashamed to share it actually .. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151605081
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    --- End diff --
    
    Added test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151523879
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
    +                @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
    +                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
    --- End diff --
    
    oh i see


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    As a side question, is there an easy way to run coverage reports with pyspark tests?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83709 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83709/testReport)** for PR 19630 at commit [`94cded6`](https://github.com/apache/spark/commit/94cded64333956933f82fdc298d486a9aa49347c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150311217
  
    --- Diff: python/pyspark/sql/group.py ---
    @@ -260,19 +259,19 @@ def wrapped(*cols):
                 import pandas as pd
                 result = func(pd.concat(cols, axis=1, keys=columns))
                 if not isinstance(result, pd.DataFrame):
    -                raise TypeError("Return type of the user-defined function should be "
    -                                "Pandas.DataFrame, but is {}".format(type(result)))
    +                raise TypeError('Return type of the user-defined function should be '
    --- End diff --
    
    Reverted.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Thanks @cloud-fan. I will do some clean up / add more tests today.
    
    I might need some help to get the build to pass because I added enum34 as a dependency. cc @HyukjinKwon I suppose we need to update the python environment in Jenkins?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83701 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83701/testReport)** for PR 19630 at commit [`75d31b4`](https://github.com/apache/spark/commit/75d31b470e7072fd284124f6a6beb679ff7de9a5).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151332296
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -34,9 +34,11 @@ import org.apache.spark.util._
      */
     private[spark] object PythonEvalType {
       val NON_UDF = 0
    -  val SQL_BATCHED_UDF = 1
    -  val SQL_PANDAS_UDF = 2
    -  val SQL_PANDAS_GROUPED_UDF = 3
    +
    +  val SQL_BATCHED_UDF = 100
    +
    +  val PANDAS_SCALAR_UDF = 200
    +  val PANDAS_GROUP_MAP_UDF = 201
    --- End diff --
    
    nit: Is it better to keep `SQL` prefix?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83698/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83962 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83962/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151331041
  
    --- Diff: python/pyspark/worker.py ---
    @@ -89,6 +90,26 @@ def verify_result_length(*a):
         return lambda *a: (verify_result_length(*a), arrow_return_type)
     
     
    +def wrap_pandas_group_map_udf(f, return_type):
    +    def wrapped(*series):
    +        import pandas as pd
    +
    +        result = f(pd.concat(series, axis=1))
    --- End diff --
    
    Can we remove `keys=columns`? The original includes it as the 3rd argument.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150311170
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
    @@ -23,14 +23,15 @@ import scala.collection.JavaConverters._
     import scala.language.implicitConversions
     
     import org.apache.spark.annotation.InterfaceStability
    +import org.apache.spark.api.python.PythonEvalType
     import org.apache.spark.broadcast.Broadcast
     import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.expressions.aggregate._
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.util.toPrettySQL
     import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
    -import org.apache.spark.sql.execution.python.{PythonUDF, PythonUdfType}
    +import org.apache.spark.sql.execution.python.{PythonUDF}
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151476071
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,88 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEquals(udf.returnType, DoubleType())
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, DoubleType())
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEquals(foo.returnType, schema)
    +        self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    --- End diff --
    
    Not insane at all :) Added the test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r149873412
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
    @@ -23,14 +23,15 @@ import scala.collection.JavaConverters._
     import scala.language.implicitConversions
     
     import org.apache.spark.annotation.InterfaceStability
    +import org.apache.spark.api.python.PythonEvalType
     import org.apache.spark.broadcast.Broadcast
     import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction}
     import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.expressions.aggregate._
     import org.apache.spark.sql.catalyst.plans.logical._
     import org.apache.spark.sql.catalyst.util.toPrettySQL
     import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
    -import org.apache.spark.sql.execution.python.{PythonUDF, PythonUdfType}
    +import org.apache.spark.sql.execution.python.{PythonUDF}
    --- End diff --
    
    We can remove the braces here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151489959
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        with QuietTest(self.sc):
    +            with self.assertRaisesRegexp(ValueError, 'return type'):
    +                @pandas_udf(PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
    +                @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
    +                def foo(df):
    +                    return df
    +            with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
    +                @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
    --- End diff --
    
    it's a little weird that we accept `DoubleType` but not `double`. If it's an existing issue, we can address it later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151476197
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -56,6 +56,20 @@
     __all__ = ["RDD"]
     
     
    +class PythonEvalType(object):
    +    """
    +    Evaluation type of python rdd.
    +
    +    These values are internal to PySpark.
    --- End diff --
    
    Added


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151333227
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -56,6 +56,20 @@
     __all__ = ["RDD"]
     
     
    +class PythonEvalType(object):
    +    """
    +    Evaluation type of python rdd.
    +
    +    These values are internal to PySpark.
    --- End diff --
    
    And should match with enum values in `org.apache.spark.api.python.PythonEvalType`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151476372
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -34,9 +34,11 @@ import org.apache.spark.util._
      */
     private[spark] object PythonEvalType {
       val NON_UDF = 0
    -  val SQL_BATCHED_UDF = 1
    -  val SQL_PANDAS_UDF = 2
    -  val SQL_PANDAS_GROUPED_UDF = 3
    +
    +  val SQL_BATCHED_UDF = 100
    +
    +  val PANDAS_SCALAR_UDF = 200
    +  val PANDAS_GROUP_MAP_UDF = 201
    --- End diff --
    
    Added SQL prefix


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r149873461
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -0,0 +1,136 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +"""
    +User-defined function related classes and functions
    +"""
    +import functools
    +
    +from pyspark import SparkContext
    +from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
    +from pyspark.sql.column import Column, _to_java_column, _to_seq
    +from pyspark.sql.types import StringType, DataType, _parse_datatype_string
    +
    +
    +def _wrap_function(sc, func, returnType):
    +    command = (func, returnType)
    +    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    +    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    +                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    +
    +
    +def _create_udf(f, *, returnType, udfType):
    +    if udfType in (PythonEvalType.PANDAS_SCALAR_UDF, PythonEvalType.PANDAS_GROUP_FLATMAP_UDF):
    +        import inspect
    +        argspec = inspect.getargspec(f)
    +        if len(argspec.args) == 0 and argspec.varargs is None:
    +            raise ValueError(
    +                "0-arg pandas_udfs are not supported. "
    +                "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    +            )
    +    udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, udfType=udfType)
    +    return udf_obj._wrapped()
    +
    +
    +class UserDefinedFunction(object):
    +    """
    +    User defined function in Python
    +
    +    .. versionadded:: 1.3
    +    """
    +    def __init__(self, func,
    +                 returnType=StringType(), name=None,
    +                 udfType=PythonEvalType.SQL_BATCHED_UDF):
    +        if not callable(func):
    +            raise TypeError(
    +                "Not a function or callable (__call__ is not defined): "
    +                "{0}".format(type(func)))
    +
    +        self.func = func
    +        self._returnType = returnType
    +        # Stores UserDefinedPythonFunctions jobj, once initialized
    +        self._returnType_placeholder = None
    +        self._judf_placeholder = None
    +        self._name = name or (
    +            func.__name__ if hasattr(func, '__name__')
    +            else func.__class__.__name__)
    +        self.udfType = udfType
    +
    +
    +    @property
    +    def returnType(self):
    +        # This makes sure this is called after SparkContext is initialized.
    +        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    +        if self._returnType_placeholder is None:
    +            if isinstance(self._returnType, DataType):
    +                self._returnType_placeholder = self._returnType
    +            else:
    +                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    +        return self._returnType_placeholder
    +
    +    @property
    +    def _judf(self):
    +        # It is possible that concurrent access, to newly created UDF,
    +        # will initialize multiple UserDefinedPythonFunctions.
    +        # This is unlikely, doesn't affect correctness,
    +        # and should have a minimal performance impact.
    +        if self._judf_placeholder is None:
    +            self._judf_placeholder = self._create_judf()
    +        return self._judf_placeholder
    +
    +    def _create_judf(self):
    +        from pyspark.sql import SparkSession
    +
    +        spark = SparkSession.builder.getOrCreate()
    +        sc = spark.sparkContext
    +
    +        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    +        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    +        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    +            self._name, wrapped_func, jdt, self.udfType)
    +        return judf
    +
    +    def __call__(self, *cols):
    +        judf = self._judf
    +        sc = SparkContext._active_spark_context
    +        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    +
    +    def _wrapped(self):
    +        """
    +        Wrap this udf with a function and attach docstring from func
    +        """
    +
    +        # It is possible for a callable instance without __name__ attribute or/and
    +        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    +        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    +        # function. So, we take out these attribute names from the default names to set and
    +        # then manually assign it after being wrapped.
    +        assignments = tuple(
    +            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    +
    +        @functools.wraps(self.func, assigned=assignments)
    +        def wrapper(*args):
    +            return self(*args)
    +
    +        wrapper.__name__ = self._name
    +        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    +                              else self.func.__class__.__module__)
    +
    +        wrapper.func = self.func
    +        wrapper.returnType = self.returnType
    +        wrapper.udfType = self.udfType
    +
    +        return wrapper
    --- End diff --
    
    Shall we add a blank line at the end of file?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151605217
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -3166,6 +3166,92 @@ def test_filtered_frame(self):
             self.assertTrue(pdf.empty)
     
     
    +class PandasUDFTests(ReusedSQLTestCase):
    +    def test_pandas_udf_basic(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +
    +        udf = pandas_udf(lambda x: x, DoubleType())
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
    +        self.assertEqual(udf.returnType, DoubleType())
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, 'v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        udf = pandas_udf(lambda x: x, returnType='v double',
    +                         functionType=PandasUDFType.GROUP_MAP)
    +        self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
    +        self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_pandas_udf_decorator(self):
    +        from pyspark.rdd import PythonEvalType
    +        from pyspark.sql.functions import pandas_udf, PandasUDFType
    +        from pyspark.sql.types import StructType, StructField, DoubleType
    +
    +        @pandas_udf(DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        @pandas_udf(returnType=DoubleType())
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, DoubleType())
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
    +
    +        schema = StructType([StructField("v", DoubleType())])
    +
    +        @pandas_udf(schema, PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +        @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
    +        def foo(x):
    +            return x
    +        self.assertEqual(foo.returnType, schema)
    +        self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
    +
    +    def test_udf_wrong_arg(self):
    --- End diff --
    
    I added a few more tests. Please let me know if you have specific tests in mind.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83942/testReport)** for PR 19630 at commit [`1f2c47b`](https://github.com/apache/spark/commit/1f2c47b569bcfa3f7ca7f974fee7cbdc21969623).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83709 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83709/testReport)** for PR 19630 at commit [`94cded6`](https://github.com/apache/spark/commit/94cded64333956933f82fdc298d486a9aa49347c).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    overall LGTM, but let's not add udf types that are not implemented yet, like the AGGREGATE


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83965 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83965/testReport)** for PR 19630 at commit [`cf1d1ca`](https://github.com/apache/spark/commit/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Looks clean and pretty solid in general. Let me take another look to double check, probably, within this weekend and maybe I will leave it to @ueshin if I can take the look ahead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151447943
  
    --- Diff: python/pyspark/worker.py ---
    @@ -89,6 +90,26 @@ def verify_result_length(*a):
         return lambda *a: (verify_result_length(*a), arrow_return_type)
     
     
    +def wrap_pandas_group_map_udf(f, return_type):
    +    def wrapped(*series):
    +        import pandas as pd
    +
    +        result = f(pd.concat(series, axis=1))
    --- End diff --
    
    series itself has a name attribute. `pd.concat` will use the name attribute of series to be the column name in dataframe.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83853 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83853/testReport)** for PR 19630 at commit [`a741a05`](https://github.com/apache/spark/commit/a741a05ddfe4b0ee052ddbe7f3e6bd300e155a36).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Actually, R has a flag for `useDaemon`:
    
    https://github.com/apache/spark/blob/478fbc866fbfdb4439788583281863ecea14e8af/core/src/main/scala/org/apache/spark/api/r/RRunner.scala#L362
    
    It'd be great if we have this flag too. It makes easier to test Windows specific issue too .. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151676061
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2050,12 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    --- End diff --
    
    So moving this will probably break some peoples code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83899/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150963595
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2279,7 +2172,38 @@ def pandas_udf(f=None, returnType=StringType()):
     
         .. note:: The user-defined function must be deterministic.
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +    # decorator @pandas_udf(dataType(), functionType)
    +    if f is None or isinstance(f, (str, DataType)):
    +        # If DataType has been passed as a positional argument
    +        # for decorator use it as a returnType
    +
    +        return_type = f or returnType
    +
    +        if return_type is None:
    +            raise ValueError("Must specify return type.")
    +
    +        if functionType is not None:
    +            # @pandas_udf(dataType, functionType=functionType)
    +            # @pandas_udf(returnType=dataType, functionType=functionType)
    +            eval_type = functionType.value
    +        elif returnType is not None and isinstance(returnType, PandasUDFType):
    +            # @pandas_udf(dataType, functionType)
    +            eval_type = returnType.value
    +        else:
    +            # @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
    --- End diff --
    
    Yes, there is a test for this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    thanks, merging to master, cheers!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151377556
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2271,15 +2169,42 @@ def pandas_udf(f=None, returnType=StringType()):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
            .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
     
         .. note:: The user-defined function must be deterministic.
         """
    -    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +    # decorator @pandas_udf(dataType(), functionType)
    +    if f is None or isinstance(f, (str, DataType)):
    --- End diff --
    
    just for curious, when `f` will be none?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83962/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83898 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83898/testReport)** for PR 19630 at commit [`658f41b`](https://github.com/apache/spark/commit/658f41b9bd2124fb3136c6f4a60352a216a2d0d3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r151677913
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2050,12 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    --- End diff --
    
    Yup, I noticed it first too when I reviewed but then noticed he imported this indentedly:
    
    https://github.com/icexelloss/spark/blob/cf1d1caa4f41c6bcf565cfc5b9e9901d94f56af3/python/pyspark/sql/functions.py#L35
    
    So, I guess it could be fine. I manually just double checked:
    
    ```python
    >>> from pyspark.sql import functions
    >>> functions.UserDefinedFunction
    <class 'pyspark.sql.udf.UserDefinedFunction'>
    >>> from pyspark import sql
    >>> sql.functions.UserDefinedFunction
    <class 'pyspark.sql.udf.UserDefinedFunction'>
    >>> from pyspark.sql.functions import UserDefinedFunction
    >>> from pyspark.sql.udf import UserDefinedFunction
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19630: wip: [SPARK-22409] Introduce function type argume...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19630#discussion_r150648631
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2049,132 +2051,13 @@ def map_values(col):
     
     # ---------------------------- User Defined Function ----------------------------------
     
    -def _wrap_function(sc, func, returnType):
    -    command = (func, returnType)
    -    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    -    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
    -                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)
    -
    -
    -class PythonUdfType(object):
    -    # row-at-a-time UDFs
    -    NORMAL_UDF = 0
    -    # scalar vectorized UDFs
    -    PANDAS_UDF = 1
    -    # grouped vectorized UDFs
    -    PANDAS_GROUPED_UDF = 2
    -
    -
    -class UserDefinedFunction(object):
    -    """
    -    User defined function in Python
    -
    -    .. versionadded:: 1.3
    -    """
    -    def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
    -        if not callable(func):
    -            raise TypeError(
    -                "Not a function or callable (__call__ is not defined): "
    -                "{0}".format(type(func)))
    -
    -        self.func = func
    -        self._returnType = returnType
    -        # Stores UserDefinedPythonFunctions jobj, once initialized
    -        self._returnType_placeholder = None
    -        self._judf_placeholder = None
    -        self._name = name or (
    -            func.__name__ if hasattr(func, '__name__')
    -            else func.__class__.__name__)
    -        self.pythonUdfType = pythonUdfType
    -
    -    @property
    -    def returnType(self):
    -        # This makes sure this is called after SparkContext is initialized.
    -        # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string.
    -        if self._returnType_placeholder is None:
    -            if isinstance(self._returnType, DataType):
    -                self._returnType_placeholder = self._returnType
    -            else:
    -                self._returnType_placeholder = _parse_datatype_string(self._returnType)
    -        return self._returnType_placeholder
    -
    -    @property
    -    def _judf(self):
    -        # It is possible that concurrent access, to newly created UDF,
    -        # will initialize multiple UserDefinedPythonFunctions.
    -        # This is unlikely, doesn't affect correctness,
    -        # and should have a minimal performance impact.
    -        if self._judf_placeholder is None:
    -            self._judf_placeholder = self._create_judf()
    -        return self._judf_placeholder
    -
    -    def _create_judf(self):
    -        from pyspark.sql import SparkSession
    -
    -        spark = SparkSession.builder.getOrCreate()
    -        sc = spark.sparkContext
    -
    -        wrapped_func = _wrap_function(sc, self.func, self.returnType)
    -        jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    -        judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
    -            self._name, wrapped_func, jdt, self.pythonUdfType)
    -        return judf
    -
    -    def __call__(self, *cols):
    -        judf = self._judf
    -        sc = SparkContext._active_spark_context
    -        return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
    -
    -    def _wrapped(self):
    -        """
    -        Wrap this udf with a function and attach docstring from func
    -        """
    -
    -        # It is possible for a callable instance without __name__ attribute or/and
    -        # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
    -        # we should avoid wrapping the attributes from the wrapped function to the wrapper
    -        # function. So, we take out these attribute names from the default names to set and
    -        # then manually assign it after being wrapped.
    -        assignments = tuple(
    -            a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
    -
    -        @functools.wraps(self.func, assigned=assignments)
    -        def wrapper(*args):
    -            return self(*args)
    -
    -        wrapper.__name__ = self._name
    -        wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
    -                              else self.func.__class__.__module__)
    -
    -        wrapper.func = self.func
    -        wrapper.returnType = self.returnType
    -        wrapper.pythonUdfType = self.pythonUdfType
    -
    -        return wrapper
    -
    -
    -def _create_udf(f, returnType, pythonUdfType):
    -
    -    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    -        if pythonUdfType == PythonUdfType.PANDAS_UDF:
    -            import inspect
    -            argspec = inspect.getargspec(f)
    -            if len(argspec.args) == 0 and argspec.varargs is None:
    -                raise ValueError(
    -                    "0-arg pandas_udfs are not supported. "
    -                    "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    -                )
    -        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
    -        return udf_obj._wrapped()
    -
    -    # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    -    if f is None or isinstance(f, (str, DataType)):
    -        # If DataType has been passed as a positional argument
    -        # for decorator use it as a returnType
    -        return_type = f or returnType
    -        return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType)
    -    else:
    -        return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)
    +class PandasUDFType(enum.Enum):
    --- End diff --
    
    I'm surprised this is not in python built-in library. What's the common way to implement enum in python? cc @HyukjinKwon 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19630: wip: [SPARK-22409] Introduce function type argument in p...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19630
  
    **[Test build #83701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83701/testReport)** for PR 19630 at commit [`75d31b4`](https://github.com/apache/spark/commit/75d31b470e7072fd284124f6a6beb679ff7de9a5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org