You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ueshin <gi...@git.apache.org> on 2017/10/16 07:08:20 UTC

[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

GitHub user ueshin opened a pull request:

    https://github.com/apache/spark/pull/19505

    [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply() with pandas udf

    ## What changes were proposed in this pull request?
    
    This is a follow-up of #18732.
    This pr introduces `@pandas_grouped_udf` decorator for grouped vectorized UDF instead of reusing `@pandas_udf` decorator.
    
    ## How was this patch tested?
    
    Exisiting tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ueshin/apache-spark issues/SPARK-20396/fup1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19505
    
----
commit 4d2bd959e1eeabb4f72cfbb52a374ce721030507
Author: Takuya UESHIN <ue...@databricks.com>
Date:   2017-10-16T06:45:55Z

    Introduce `@pandas_grouped_udf` decorator for grouped vectorized UDF.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144848936
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    --- End diff --
    
    Thanks! I'll update the message.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144768754
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2195,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, vectorized=True, grouped=False)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    inside this method we can create a `StructType` with `returnTypes` and pass to `_create_udf `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Change itself LGTM if we are okay to go separating this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82811 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82811/testReport)** for PR 19505 at commit [`fdafb35`](https://github.com/apache/spark/commit/fdafb3561d44ca2583380b7aeaf7843ce5285b1e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82805/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82813 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82813/testReport)** for PR 19505 at commit [`7332969`](https://github.com/apache/spark/commit/733296951b45d760aa0a8465eb0189077ea67372).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145183505
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    I submitted another pr #19517 based on this as a comparison.
    I guess it covers what you are thinking.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82831 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82831/testReport)** for PR 19505 at commit [`85f250d`](https://github.com/apache/spark/commit/85f250d0eda56606a599c5fb15046ef0fd63a3c4).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144924728
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,40 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
                     )
    -        udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
    +        elif pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
    +            import inspect
    +            argspec = inspect.getargspec(f)
    +            if len(argspec.args) != 1 and argspec.varargs is None:
    +                raise ValueError("Only 1-arg pandas_grouped_udfs are supported.")
    +
    +        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
             return udf_obj._wrapped()
     
         # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    --- End diff --
    
    Thanks! I'll update it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    +1 for separate JIRA to clarify the proposal and +0 for 3. out of those three, too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144780828
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2195,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, vectorized=True, grouped=False)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    ah i see, make sense


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144768652
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2044,7 +2044,7 @@ class UserDefinedFunction(object):
     
         .. versionadded:: 1.3
         """
    -    def __init__(self, func, returnType, name=None, vectorized=False):
    +    def __init__(self, func, returnType, name=None, vectorized=False, grouped=False):
    --- End diff --
    
    Sounds good. I'll modify it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82790/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    @cloud-fan asked:
    "
    what's the difference between transform and group_transform? Seems we don't need to care about it both in usage and implementation.
    "
    
    My answer is:
    transform defines a transformation that doesn't reply on grouping semantics: for instance, this is a wrong udf definition:
    
    @pandas_udf(DoubleType(), TRANSFORM):
    def foo(v):
         return (v - v.mean() / v.std())
    because the transformation is replying some kind of "grouping semantics", otherwise v.mean() and v.std() has no meaning for arbitrary grouping.
    
    Also, catalyst should throw exception for the code example below:
    ```
    @pandas_udf(DoubleType(), GROUP_TRANSFORM):
    def foo(v):
          return (v - v.mean()) / v.std()
    
    # Should throw exception here, it should only take `transform` not `group_transform` type
    df = df.withColumn(foo(df.v))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82811 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82811/testReport)** for PR 19505 at commit [`fdafb35`](https://github.com/apache/spark/commit/fdafb3561d44ca2583380b7aeaf7843ce5285b1e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82843 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82843/testReport)** for PR 19505 at commit [`1ef25c3`](https://github.com/apache/spark/commit/1ef25c34b9d23cb2a8c6a1b28e81b8c9c0ad377e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144926010
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    --- End diff --
    
    It's an output requirement.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82805 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82805/testReport)** for PR 19505 at commit [`789e642`](https://github.com/apache/spark/commit/789e642763ab4f59e14137fcc75b514223bc7aae).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144917109
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    --- End diff --
    
    What happened if we do not pass a primitive data type? Do we have a test case for this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144857957
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    --- End diff --
    
    I think so. If it didn't become too complicated, maybe we can also check it for pandas_grouped_udf.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82831/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    @icexelloss The summary and the proposal 3 looks great. To prevent confusing, can you also put the usage of each function type in proposal 3? E.g., group_map is for `groupby().apply()`, transform is for `withColumn`, etc? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82813 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82813/testReport)** for PR 19505 at commit [`7332969`](https://github.com/apache/spark/commit/733296951b45d760aa0a8465eb0189077ea67372).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145637565
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    I guess we should consider merging #19517 first because it's an improvement of the behavior by introducing `PythonUdfType` instead of the hack to detect the udf type by the return type at worker, without any user-facing API changes from #18732.
    The proposal and discussion should be in this pr but out of any thread to avoid being collapsed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145744005
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Proposal 3 looks great! one minor question: what's the difference between `transform` and `group_transform`? Seems we don't need to care about it both in usage and implementation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144780130
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2195,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, vectorized=True, grouped=False)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    The fields of the return type is used as the output of the plan. I guess the field names are also useful for users.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82803 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82803/testReport)** for PR 19505 at commit [`10512a6`](https://github.com/apache/spark/commit/10512a64a9560eee6d3f65802abd042dedf0cafb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82803 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82803/testReport)** for PR 19505 at commit [`10512a6`](https://github.com/apache/spark/commit/10512a64a9560eee6d3f65802abd042dedf0cafb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    The group_transform udfs looks a bit weird to me. @icexelloss Can you explain the use case of it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82843 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82843/testReport)** for PR 19505 at commit [`1ef25c3`](https://github.com/apache/spark/commit/1ef25c34b9d23cb2a8c6a1b28e81b8c9c0ad377e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82813/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144933717
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Per discussion here:
    https://github.com/apache/spark/pull/18732#issuecomment-336976746
    
    Should we consider convert `pandas_udf` to `pandas_grouped_udf` implicitly in `groupby apply` and not introduce `pandas_grouped_udf` as a user facing API?
    
    `groupby apply` implies the udf is a grouped udf, so there should not be ambiguity here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145788248
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Sorry for the late reply.
    
    @gatorsmile Sounds good. I will copy the discussion in this PR as @ueshin suggested.
    
    @ueshin +1 to merge #19517. I think it's a good change and will make it easier for later changes.
    
    @cloud-fan `transform` defines a transformation that doesn't reply on grouping semantics: for instance, this is a wrong udf definition:
    ```
    @pandas_udf(DoubleType(), TRANSFORM):
    def foo(v):
         return (v - v.mean() / v.std())
    ```
    because the transformation is replying some kind of "grouping semantics", otherwise `v.mean()` and `v.std()` has no meaning for arbitrary grouping.  



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    So, looks we are good to go?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82810 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82810/testReport)** for PR 19505 at commit [`122a7bc`](https://github.com/apache/spark/commit/122a7bccaff11def2c12cfccdd00244394ed3478).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82835 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82835/testReport)** for PR 19505 at commit [`85f250d`](https://github.com/apache/spark/commit/85f250d0eda56606a599c5fb15046ef0fd63a3c4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Btw, I think the scope of this change is more than just a follow-up. Should we create another JIRA for it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    I'd mark this pr as [WIP] for now because we don't reach consensus on API changes. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82790 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82790/testReport)** for PR 19505 at commit [`4d2bd95`](https://github.com/apache/spark/commit/4d2bd959e1eeabb4f72cfbb52a374ce721030507).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145552397
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Here is a summary of the current proposal:
    
    I. Use only `pandas_udf`
    --------------------------
    The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation.
    For instance, for a udf:
    ```
    @pandas_udf(DoubleType())
    def foo(v):
          ...
    ``` 
    It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double.
    
    This is less than ideal because:
    * The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with `groupby().apply()` or `withColumn` or `groupby().agg()`?
    * Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to `groupby().agg()`
    
    II. Use different decorators. i,e, `pandas_udf` (or `pandas_scalar_udf`), `pandas_grouped_udf`, `pandas_udaf`
    ----------------------------------------------------------------------------------------------------------------
    The idea of this approach is to use `pandas_grouped_udf` for all group udfs, and `pandas_scalar_udf` for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above.
     
    III. Use `pandas_udf` decorate and a function type enum for "one-step" vectorized udf and `pandas_udaf` for multi-step aggregation function
    ----------------------------------------------------------
    This approach uses a function type enum to describe what the udf does. Here are the proposed function types:
    * transform
    A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf.
    * group_transform
    A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_TRANSFORM):
    def foo(v):
          return (v - v.mean()) / v.std()
    ```
    * group_aggregate:
    A pd.Series(s) -> scalar function, e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_AGGREGATE):
    def foo(v):
          return v.mean()
    ```
    * group_map (maybe a better name):
    This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current `groupby().apply()` udf
    
    These types also works with window functions because window functions are either (1) group_transform (rank) or (2) group_aggregate (first, last)
    
    I am in favor of (3). What do you guys think?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144859965
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
    --- End diff --
    
    Yes, I'll add it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82831 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82831/testReport)** for PR 19505 at commit [`85f250d`](https://github.com/apache/spark/commit/85f250d0eda56606a599c5fb15046ef0fd63a3c4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82790 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82790/testReport)** for PR 19505 at commit [`4d2bd95`](https://github.com/apache/spark/commit/4d2bd959e1eeabb4f72cfbb52a374ce721030507).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144917550
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    --- End diff --
    
    Is this just a fact? or an input requirement?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144929491
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    --- End diff --
    
    Yes, they can and it will fail. https://github.com/apache/spark/blob/122a7bccaff11def2c12cfccdd00244394ed3478/python/pyspark/sql/tests.py#L3316-L3325.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    @viirya @cloud-fan I updated my original summary. I think it answers `group_transform` question. I also added more example to each type.
    
    @HyukjinKwon @viirya I agree we can move this to a separate Jira and merge current PR of @ueshin. Maybe I can open another PR with just the proposal design doc? Not sure what's the best way is.
    
     
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    @ueshin Maybe close this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82793 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82793/testReport)** for PR 19505 at commit [`f096870`](https://github.com/apache/spark/commit/f0968702038e11c9c9a8f305c61f72d3f9e00f9a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82811/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145012436
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -137,11 +137,15 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
               udf.references.subsetOf(child.outputSet)
             }
             if (validUdfs.nonEmpty) {
    +          if (validUdfs.find(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF).isDefined) {
    --- End diff --
    
    nit: maybe
    
    ```scala
    validUdfs.exists(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144839348
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    --- End diff --
    
    Maybe also update this error message.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144924765
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2038,13 +2038,19 @@ def _wrap_function(sc, func, returnType):
                                       sc.pythonVer, broadcast_vars, sc._javaAccumulator)
     
     
    +class PythonUdfType(object):
    --- End diff --
    
    Sure, I'll add the descriptions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82793/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82835 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82835/testReport)** for PR 19505 at commit [`85f250d`](https://github.com/apache/spark/commit/85f250d0eda56606a599c5fb15046ef0fd63a3c4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145568265
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Post it in another PR https://github.com/apache/spark/pull/19517? This discussion thread will be collapsed when Takuya made a code change. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82835/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144768646
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2195,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, vectorized=True, grouped=False)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    how about `returnTypes` without default value? `pandas_grouped_udf` always return a DataFrame and we should just ask users to give the data type of each column.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144859680
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    --- End diff --
    
    Thanks, let me try.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144936208
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    --- End diff --
    
    I see. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145225179
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    +
    +    >>> from pyspark.sql.types import IntegerType, StringType
    +    >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    +    >>> @pandas_udf(returnType=StringType())
    +    ... def to_upper(s):
    +    ...     return s.str.upper()
    +    ...
    +    >>> @pandas_udf(returnType="integer")
    +    ... def add_one(x):
    +    ...     return x + 1
    +    ...
    +    >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    +    >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    +    ...     .show()  # doctest: +SKIP
    +    +----------+--------------+------------+
    +    |slen(name)|to_upper(name)|add_one(age)|
    +    +----------+--------------+------------+
    +    |         8|      JOHN DOE|          22|
    +    +----------+--------------+------------+
    +
    +    .. note:: The user-defined function must be deterministic.
    +    """
    +    return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)
    +
    +
    +@since(2.3)
    +def pandas_grouped_udf(f=None, returnType=StructType()):
    --- End diff --
    
    Thanks @ueshin , yes that's what I am thinking.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82793 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82793/testReport)** for PR 19505 at commit [`f096870`](https://github.com/apache/spark/commit/f0968702038e11c9c9a8f305c61f72d3f9e00f9a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class PythonUdfType(object):`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144927194
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    +    The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    --- End diff --
    
    Can users break this requirement? If so, what happened?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144859561
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
    --- End diff --
    
    shall we add the check that `PANDAS_GROUPED_UDF` can only take one parameter?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r145027904
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -137,11 +137,15 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
               udf.references.subsetOf(child.outputSet)
             }
             if (validUdfs.nonEmpty) {
    +          if (validUdfs.find(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF).isDefined) {
    --- End diff --
    
    Thanks! I'll update it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] group...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin closed the pull request at:

    https://github.com/apache/spark/pull/19505


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Here is a summary of the current proposal during some offline disuccsion:
    
    I. Use only `pandas_udf`
    --------------------------
    The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation.
    For instance, for a udf:
    ```
    @pandas_udf(DoubleType())
    def foo(v):
          ...
    ``` 
    It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double.
    
    This is less than ideal because:
    * The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with `groupby().apply()` or `withColumn` or `groupby().agg()`?
    * Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to `groupby().agg()`
    
    II. Use different decorators. i,e, `pandas_udf` (or `pandas_scalar_udf`), `pandas_grouped_udf`, `pandas_udaf`
    ----------------------------------------------------------------------------------------------------------------
    The idea of this approach is to use `pandas_grouped_udf` for all group udfs, and `pandas_scalar_udf` for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above.
     
    III. Use `pandas_udf` decorate and a function type enum for "one-step" vectorized udf and `pandas_udaf` for multi-step aggregation function
    ----------------------------------------------------------
    This approach uses a function type enum to describe what the udf does. Here are the proposed function types:
    * transform
    A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf.
    * group_transform
    A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_TRANSFORM):
    def foo(v):
          return (v - v.mean()) / v.std()
    ```
    * group_aggregate:
    A pd.Series(s) -> scalar function, e.g.
    ```
    @pandas_udf(DoubleType(), GROUP_AGGREGATE):
    def foo(v):
          return v.mean()
    ```
    * group_map (maybe a better name):
    This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current `groupby().apply()` udf
    
    These types also works with window functions because window functions are either (1) group_transform (rank) or (2) group_aggregate (first, last)
    
    I am in favor of (3). What do you guys think?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144768380
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2044,7 +2044,7 @@ class UserDefinedFunction(object):
     
         .. versionadded:: 1.3
         """
    -    def __init__(self, func, returnType, name=None, vectorized=False):
    +    def __init__(self, func, returnType, name=None, vectorized=False, grouped=False):
    --- End diff --
    
    `vectorized=False, grouped=True` is an invalid combination. How about we introduce a `udfType` and `0` means normal udf, `1` means pandas udf, and `2` means pandas grouped udf?  We can create something like `object PythonEvalType` to sync this encoding between python and java.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Sure, I'd close this.
    @icexelloss Of course you can open a separate JIRA and another PR. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82810 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82810/testReport)** for PR 19505 at commit [`122a7bc`](https://github.com/apache/spark/commit/122a7bccaff11def2c12cfccdd00244394ed3478).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82803/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().ap...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    I meant to ask if others agree with the current change as I could not see the ongoing discussion at that time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    **[Test build #82805 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82805/testReport)** for PR 19505 at commit [`789e642`](https://github.com/apache/spark/commit/789e642763ab4f59e14137fcc75b514223bc7aae).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144926703
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2192,67 +2205,82 @@ def pandas_udf(f=None, returnType=StringType()):
         :param f: user-defined function. A python function if used as a standalone function
         :param returnType: a :class:`pyspark.sql.types.DataType` object
     
    -    The user-defined function can define one of the following transformations:
    -
    -    1. One or more `pandas.Series` -> A `pandas.Series`
    -
    -       This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    -       :meth:`pyspark.sql.DataFrame.select`.
    -       The returnType should be a primitive data type, e.g., `DoubleType()`.
    -       The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
    -
    -       >>> from pyspark.sql.types import IntegerType, StringType
    -       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
    -       >>> @pandas_udf(returnType=StringType())
    -       ... def to_upper(s):
    -       ...     return s.str.upper()
    -       ...
    -       >>> @pandas_udf(returnType="integer")
    -       ... def add_one(x):
    -       ...     return x + 1
    -       ...
    -       >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
    -       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
    -       ...     .show()  # doctest: +SKIP
    -       +----------+--------------+------------+
    -       |slen(name)|to_upper(name)|add_one(age)|
    -       +----------+--------------+------------+
    -       |         8|      JOHN DOE|          22|
    -       +----------+--------------+------------+
    -
    -    2. A `pandas.DataFrame` -> A `pandas.DataFrame`
    -
    -       This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
    -       The returnType should be a :class:`StructType` describing the schema of the returned
    -       `pandas.DataFrame`.
    -
    -       >>> df = spark.createDataFrame(
    -       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    -       ...     ("id", "v"))
    -       >>> @pandas_udf(returnType=df.schema)
    -       ... def normalize(pdf):
    -       ...     v = pdf.v
    -       ...     return pdf.assign(v=(v - v.mean()) / v.std())
    -       >>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
    -       +---+-------------------+
    -       | id|                  v|
    -       +---+-------------------+
    -       |  1|-0.7071067811865475|
    -       |  1| 0.7071067811865475|
    -       |  2|-0.8320502943378437|
    -       |  2|-0.2773500981126146|
    -       |  2| 1.1094003924504583|
    -       +---+-------------------+
    -
    -       .. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
    -                 because it defines a `DataFrame` transformation rather than a `Column`
    -                 transformation.
    -
    -       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
    +    The user-defined function can define the following transformation:
    +
    +    One or more `pandas.Series` -> A `pandas.Series`
    +
    +    This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
    +    :meth:`pyspark.sql.DataFrame.select`.
    +    The returnType should be a primitive data type, e.g., `DoubleType()`.
    --- End diff --
    
    It will fail in runtime. I'll add tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144915012
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2038,13 +2038,19 @@ def _wrap_function(sc, func, returnType):
                                       sc.pythonVer, broadcast_vars, sc._javaAccumulator)
     
     
    +class PythonUdfType(object):
    --- End diff --
    
    Could you also add the descriptions about these three UDF types? 
    - NORMAL_UDF: row-based UDFs
    - PANDAS_UDF: single-row vectorized UDFs
    - PANDAS_GROUPED_UDF: grouped vectorized UDFs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82810/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144912405
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,40 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
                     )
    -        udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
    +        elif pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
    +            import inspect
    +            argspec = inspect.getargspec(f)
    +            if len(argspec.args) != 1 and argspec.varargs is None:
    +                raise ValueError("Only 1-arg pandas_grouped_udfs are supported.")
    +
    +        udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
             return udf_obj._wrapped()
     
         # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
    --- End diff --
    
    Nit: update this comment


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply()...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19505
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82843/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19505: [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby()....

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19505#discussion_r144852099
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2121,33 +2127,35 @@ def wrapper(*args):
     
             wrapper.func = self.func
             wrapper.returnType = self.returnType
    -        wrapper.vectorized = self.vectorized
    +        wrapper.pythonUdfType = self.pythonUdfType
     
             return wrapper
     
     
    -def _create_udf(f, returnType, vectorized):
    +def _create_udf(f, returnType, pythonUdfType):
     
    -    def _udf(f, returnType=StringType(), vectorized=vectorized):
    -        if vectorized:
    +    def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
    +        if pythonUdfType == PythonUdfType.PANDAS_UDF \
    +           or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
                 import inspect
                 argspec = inspect.getargspec(f)
                 if len(argspec.args) == 0 and argspec.varargs is None:
                     raise ValueError(
                         "0-arg pandas_udfs are not supported. "
                         "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
    --- End diff --
    
    Hmm, when pandas_grouped_udfs, the number of args should be only 1?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org