You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by icexelloss <gi...@git.apache.org> on 2018/01/17 18:43:22 UTC

[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

GitHub user icexelloss opened a pull request:

    https://github.com/apache/spark/pull/20295

    [SPARK-23011] Support alternative function form with group aggregate pandas UDF

    ## What changes were proposed in this pull request?
    
    This PR proposes to support an alternative function from with group aggregate pandas UDF.
    
    The current form:
    ```
    def foo(pdf):
        return ...
    ```
    Takes a single arg that is a pandas DataFrame.
    
    With this PR, an alternative form is supported:
    ```
    def foo(key, pdf):
        return ...
    ```
    The alternative form takes two argument - a tuple that presents the grouping key, and a pandas DataFrame represents the data.
    
    ## How was this patch tested?
    
    GroupbyApplyTests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/icexelloss/spark SPARK-23011-groupby-apply-key

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20295
    
----
commit 38195ac7f0b9e1227cfc1e407de47e276b3fc43f
Author: Li Jin <ic...@...>
Date:   2018-01-17T18:37:18Z

    Initial commit. Test passes.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88048/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r214796189
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -4588,6 +4613,80 @@ def test_timestamp_dst(self):
             result = df.groupby('time').apply(foo_udf).sort('time')
             self.assertPandasEqual(df.toPandas(), result.toPandas())
     
    +    def test_udf_with_key(self):
    +        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
    +        df = self.data
    +        pdf = df.toPandas()
    +
    +        def foo1(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=pdf.v * key[0],
    +                              v3=pdf.v * pdf.id,
    +                              v4=pdf.v * pdf.id.mean())
    +
    +        def foo2(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +            assert type(key[1]) == np.int32
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=key[1],
    +                              v3=pdf.v * key[0],
    +                              v4=pdf.v + key[1])
    +
    +        def foo3(key, pdf):
    +            assert type(key) == tuple
    +            assert len(key) == 0
    +            return pdf.assign(v1=pdf.v * pdf.id)
    +
    +        # v2 is int because numpy.int64 * pd.Series<int32> results in pd.Series<int32>
    +        # v3 is long because pd.Series<int64> * pd.Series<int32> results in pd.Series<int64>
    +        udf1 = pandas_udf(
    +            foo1,
    +            'id long, v int, v1 long, v2 int, v3 long, v4 double',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf2 = pandas_udf(
    +            foo2,
    +            'id long, v int, v1 long, v2 int, v3 int, v4 int',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf3 = pandas_udf(
    +            foo3,
    +            'id long, v int, v1 long',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        # Test groupby column
    +        result1 = df.groupby('id').apply(udf1).sort('id', 'v').toPandas()
    +        expected1 = pdf.groupby('id')\
    +            .apply(lambda x: udf1.func((x.id.iloc[0],), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected1, result1)
    +
    +        # Test groupby expression
    +        result2 = df.groupby(df.id % 2).apply(udf1).sort('id', 'v').toPandas()
    +        expected2 = pdf.groupby(pdf.id % 2)\
    +            .apply(lambda x: udf1.func((x.id.iloc[0] % 2,), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected2, result2)
    +
    +        # Test complex groupby
    +        result3 = df.groupby(df.id, df.v % 2).apply(udf2).sort('id', 'v').toPandas()
    --- End diff --
    
    For end users, the misuse of this alternative functions could be common. For example, do we issue an appropriate error in the following cases?
    
    - result3 = df.groupby(df.id).apply(udf2).sort('id', 'v').toPandas()
    - result3 = df.groupby(df.id, df.v % 2, df.id).apply(udf2).sort('id', 'v').toPandas()


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87968 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87968/testReport)** for PR 20295 at commit [`722ed50`](https://github.com/apache/spark/commit/722ed508c280e63fb89cbdc891061c3c1928bf74).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86836/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171297003
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    --- End diff --
    
    nit: `GROUP_MAP` -> `GROUPED_MAP`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/900/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86836 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86836/testReport)** for PR 20295 at commit [`2399b77`](https://github.com/apache/spark/commit/2399b770551bcc16721af0199971b5b66536707b).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Let me experiment with new serialization approach. Will update here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r162981806
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -267,13 +267,13 @@ def load_stream(self, stream):
             """
             Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
             """
    -        from pyspark.sql.types import _check_dataframe_localize_timestamps
    +        from pyspark.sql.types import _check_series_localize_timestamps
             import pyarrow as pa
             reader = pa.open_stream(stream)
             for batch in reader:
                 # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
    -            pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
    -            yield [c for _, c in pdf.iteritems()]
    +            yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone)
    +                   for c in pa.Table.from_batches([batch]).itercolumns()]
    --- End diff --
    
    I actually don't know what the comment above means. @BryanCutler do you remember?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87736/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88020/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172374978
  
    --- Diff: python/pyspark/worker.py ---
    @@ -91,10 +92,16 @@ def verify_result_length(*a):
     
     
     def wrap_grouped_map_pandas_udf(f, return_type):
    -    def wrapped(*series):
    +    def wrapped(key_series, value_series):
             import pandas as pd
    +        argspec = inspect.getargspec(f)
    --- End diff --
    
    should this also do `getfullargspec` for py3 like in udf.py?
    maybe it would be useful to put a function in util.py, what do you guys think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172909675
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -35,24 +37,29 @@ def _wrap_function(sc, func, returnType):
                                       sc.pythonVer, broadcast_vars, sc._javaAccumulator)
     
     
    +def _get_argspec(f):
    --- End diff --
    
    Make sense. Moved to pyspark.util.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86604 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86604/testReport)** for PR 20295 at commit [`c7fccde`](https://github.com/apache/spark/commit/c7fccde701723c8808b095586e987ad02fc171c7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87129 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87129/testReport)** for PR 20295 at commit [`fef4989`](https://github.com/apache/spark/commit/fef49892f6c52c0d6313d10e8d030dc8f85a636b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    > @BryanCutler yeah I think kwargs is another option. But I think the API in this PR is more consistent with the exsiting APIs though.
    
    Yeah if it's consistent with other APIs then sounds fine with me.  My concern was in giving the user too many options that it starts to get confusing to make UDFs. If it's a familiar API then that probably won't be the case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87968 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87968/testReport)** for PR 20295 at commit [`722ed50`](https://github.com/apache/spark/commit/722ed508c280e63fb89cbdc891061c3c1928bf74).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/918/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87129 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87129/testReport)** for PR 20295 at commit [`fef4989`](https://github.com/apache/spark/commit/fef49892f6c52c0d6313d10e8d030dc8f85a636b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87490 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87490/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249968
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
    --- End diff --
    
    Added. Hopefully it's clear enough?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Rebased


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88048 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88048/testReport)** for PR 20295 at commit [`59bdf20`](https://github.com/apache/spark/commit/59bdf200c6b922d18a64db0c7a67f9ec069c67ba).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    For https://github.com/apache/spark/pull/20295#issuecomment-360297123, I am fine without new serialization protocol actually. I didn't have a strong preference there because I wasn't sure if it's worth - complexity vs actual gain vaguely and seems that's now clarified there. I am okay with the current approach.
    
    @BryanCutler, I think the intention here is to follow other few APIs and `gapply` in R. I guess you meant the length and metadata stuff by "an optional kwargs to each pandas_udf to deal with 0-param udfs" if I remember correctly. I think that's slightly different because here the motivation is to provide consistent support similarly with other APIs vs the `kwargs` thing sounds pretty few concept to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87463 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87463/testReport)** for PR 20295 at commit [`cf45827`](https://github.com/apache/spark/commit/cf458279c4630ed0fb565b6836d10b909185309e).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86606/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r214797748
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -4588,6 +4613,80 @@ def test_timestamp_dst(self):
             result = df.groupby('time').apply(foo_udf).sort('time')
             self.assertPandasEqual(df.toPandas(), result.toPandas())
     
    +    def test_udf_with_key(self):
    +        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
    +        df = self.data
    +        pdf = df.toPandas()
    +
    +        def foo1(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=pdf.v * key[0],
    +                              v3=pdf.v * pdf.id,
    +                              v4=pdf.v * pdf.id.mean())
    +
    +        def foo2(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +            assert type(key[1]) == np.int32
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=key[1],
    +                              v3=pdf.v * key[0],
    +                              v4=pdf.v + key[1])
    +
    +        def foo3(key, pdf):
    +            assert type(key) == tuple
    +            assert len(key) == 0
    +            return pdf.assign(v1=pdf.v * pdf.id)
    +
    +        # v2 is int because numpy.int64 * pd.Series<int32> results in pd.Series<int32>
    +        # v3 is long because pd.Series<int64> * pd.Series<int32> results in pd.Series<int64>
    +        udf1 = pandas_udf(
    +            foo1,
    +            'id long, v int, v1 long, v2 int, v3 long, v4 double',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf2 = pandas_udf(
    +            foo2,
    +            'id long, v int, v1 long, v2 int, v3 int, v4 int',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf3 = pandas_udf(
    +            foo3,
    +            'id long, v int, v1 long',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        # Test groupby column
    +        result1 = df.groupby('id').apply(udf1).sort('id', 'v').toPandas()
    +        expected1 = pdf.groupby('id')\
    +            .apply(lambda x: udf1.func((x.id.iloc[0],), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected1, result1)
    +
    +        # Test groupby expression
    +        result2 = df.groupby(df.id % 2).apply(udf1).sort('id', 'v').toPandas()
    +        expected2 = pdf.groupby(pdf.id % 2)\
    +            .apply(lambda x: udf1.func((x.id.iloc[0] % 2,), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected2, result2)
    +
    +        # Test complex groupby
    +        result3 = df.groupby(df.id, df.v % 2).apply(udf2).sort('id', 'v').toPandas()
    --- End diff --
    
    In that case, any error in this case will be thrown as is from worker.py side which is read and redirect to users end via JVM. For instance:
    
    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    def test_func(key, pdf):
        assert len(key) == 0
        return pdf
    
    udf1 = pandas_udf(test_func, "id long, v1 double", PandasUDFType.GROUPED_MAP)
    spark.range(10).groupby('id').apply(udf1).sort('id').show()
    ```
    
    ```
    18/09/04 14:22:52 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/.../spark/python/pyspark/sql/dataframe.py", line 378, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
      File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/.../spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o68.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 353, in main
        process()
      File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 348, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 242, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 110, in wrapped
        result = f(key, pd.concat(value_series, axis=1))
      File "/.../spark/python/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 2, in test_func
    AssertionError
    
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:418)
    	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:372)
    	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
    	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
    	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:48)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:128)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:367)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1348)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:373)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    
    Driver stacktrace:
    	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1822)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1810)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1809)
    	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1809)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    	at scala.Option.foreach(Option.scala:257)
    	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2043)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1992)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1981)
    	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1029)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1011)
    	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1433)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    	at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1420)
    	at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:207)
    	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
    	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
    	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    	at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
    	at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
    	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
    	at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:282)
    	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 353, in main
        process()
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 348, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 242, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "<string>", line 1, in <lambda>
      File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 110, in wrapped
        result = f(key, pd.concat(value_series, axis=1))
      File "/.../spark/python/pyspark/util.py", line 99, in wrapper
        return f(*args, **kwargs)
      File "<stdin>", line 2, in test_func
    AssertionError
    
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:418)
    	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:372)
    	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
    	at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:628)
    	at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1427)
    	at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1424)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:48)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    	at org.apache.spark.scheduler.Task.run(Task.scala:128)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$7.apply(Executor.scala:367)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1348)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:373)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	... 1 more
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88025 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88025/testReport)** for PR 20295 at commit [`c74ed05`](https://github.com/apache/spark/commit/c74ed05dec5c9b2521f5e37cf7e01e5ce873c779).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    How do we turn a single group column to a series? just repeat the group column?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86836 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86836/testReport)** for PR 20295 at commit [`2399b77`](https://github.com/apache/spark/commit/2399b770551bcc16721af0199971b5b66536707b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r167501868
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1694,6 +1694,13 @@ def from_arrow_schema(arrow_schema):
              for field in arrow_schema])
     
     
    +def _check_series_convert_date(series, data_type):
    --- End diff --
    
    Do we need some documents here as well?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171311096
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ---
    @@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
         val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
         val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
         val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
    -    val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray)
    -    val schema = StructType(child.schema.drop(groupingAttributes.length))
         val sessionLocalTimeZone = conf.sessionLocalTimeZone
         val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
     
    +    // Deduplicate the grouping attributes.
    +    // If a grouping attribute also appears in data attributes, then we don't need to send the
    +    // grouping attribute to Python worker. If a grouping attribute is not in data attributes,
    +    // then we need to send this grouping attribute to python worker.
    +    //
    +    // We use argOffsets to distinguish grouping attributes and data attributes as following:
    +    //
    +    // argOffsets[0] is the length of grouping attributes
    +    // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping attributes
    +    // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data attributes
    +
    +    val dupGroupingIndices = new ArrayBuffer[Int]
    +    val groupingArgOffsets = new ArrayBuffer[Int]
    +    val extraGroupingAttributes = new ArrayBuffer[Attribute]
    +
    +    val dataAttributes = child.output.drop(groupingAttributes.length)
    +    groupingAttributes.foreach { attribute =>
    +      val index = dataAttributes.indexWhere(
    +        childAttribute => attribute.semanticEquals(childAttribute))
    +      dupGroupingIndices += index
    +    }
    +
    +    val extraGroupingSize = dupGroupingIndices.count(_ == -1)
    +    (groupingAttributes zip dupGroupingIndices).foreach {
    --- End diff --
    
    nit -> `groupingAttributes.zip(dupGroupingIndices)`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    cc @ueshin @HyukjinKwon @cloud-fan @viirya 
    
    This PR implements discussion here https://github.com/apache/spark/pull/20211#pullrequestreview-87657832. There are more refinement needs to be done but I'd like to get some early feedback whether this approach looks good  in general.
    
    The general idea is to pass grouping columns as extra columns to the python worker and to use `argOffsets` to specify which columns are grouping columns. Finally, we convert a grouping columns to a single tuple before enter user function. This is slightly inefficient because grouping columns always have the same value for each group. But I think this is OK because grouping columns should be relatively small comparing to the entire DataFrame.
    
    I can also implement some kind of de duplicate logic in `FlatMapGroupsInPandasExec`, however that would require creating another `UnsafeProjection`, I am not sure if it's worth it performance wise. 
    
    WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/393/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86606 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86606/testReport)** for PR 20295 at commit [`cda97f1`](https://github.com/apache/spark/commit/cda97f142285bc01a2da96765d3703d37b4cb802).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86834/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r163081467
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -267,13 +267,13 @@ def load_stream(self, stream):
             """
             Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
             """
    -        from pyspark.sql.types import _check_dataframe_localize_timestamps
    +        from pyspark.sql.types import _check_series_localize_timestamps
             import pyarrow as pa
             reader = pa.open_stream(stream)
             for batch in reader:
                 # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
    -            pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
    -            yield [c for _, c in pdf.iteritems()]
    +            yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone)
    +                   for c in pa.Table.from_batches([batch]).itercolumns()]
    --- End diff --
    
    Yeah, the note was because prev we iterated over Arrow columns and converted each to a Series, then changed to convert an Arrow batch to DataFrame and then iterated over DataFrame columns to get a Series.  I wasn't sure if there might be a perf decrease, so I left the note but I'm not sure why it wasn't done like the above in the first place - seems like it would be just as good as the original.  Anyway, yeah the note can be removed now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171307853
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
    --- End diff --
    
    @icexelloss, would you mind if I ask to leave some comments here too to elaborate this assert and the offset used to distinguishing grouping columns? linking or pointing `FlatMapGroupsInPandasExec` could also be fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172226094
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1725,6 +1737,29 @@ def _get_local_timezone():
         return os.environ.get('TZ', 'dateutil/:')
     
     
    +def _check_series_localize_timestamps(s, timezone):
    +    """
    +    Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone.
    +
    +    If the input series is not a timestamp series, then the same series is returned. If the input
    +    series is a timestamp series, then a converted series is returned.
    +
    +    :param s: pandas.Series
    +    :param timezone: the timezone to convert. if None then use local timezone
    +    :return pandas.Series that have been converted to tz-naive
    +    """
    +    from pyspark.sql.utils import require_minimum_pandas_version
    +    require_minimum_pandas_version()
    +
    +    from pandas.api.types import is_datetime64tz_dtype
    --- End diff --
    
    This function itself `_check_series_localize_timestamps` doesn't have a unit test, but it's called in various arrow/pandas_udf tests related to timestamps.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86834 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86834/testReport)** for PR 20295 at commit [`2668251`](https://github.com/apache/spark/commit/266825167f0bf308c0b4213b1ef718a930a47c2b).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86604/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    @HyukjinKwon @felixcheung Thank you for the comments. I will take a look and hopefully address them today.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Yep, that's correct.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88025 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88025/testReport)** for PR 20295 at commit [`c74ed05`](https://github.com/apache/spark/commit/c74ed05dec5c9b2521f5e37cf7e01e5ce873c779).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87968/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88025/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172225237
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
             arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    -        udfs['f%d' % i] = udf
    -        args = ["a[%d]" % o for o in arg_offsets]
    -        call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    -    # Create function like this:
    -    #   lambda a: (f0(a0), f1(a1, a2), f2(a3))
    -    # In the special case of a single UDF this will return a single result rather
    -    # than a tuple of results; this is the format that the JVM side expects.
    -    mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    -    mapper = eval(mapper_str, udfs)
    +        udfs['f'] = udf
    +        split_offset = arg_offsets[0] + 1
    +        arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
    +        arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
    +        mapper_str = ("lambda a: f([%s], [%s])" % (", ".join(arg0), ", ".join(arg1)))
    +    else:
    +        # Create function like this:
    +        #   lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
    +        # In the special case of a single UDF this will return a single result rather
    +        # than a tuple of results; this is the format that the JVM side expects.
    +        for i in range(num_udfs):
    +            arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    +            udfs['f%d' % i] = udf
    +            args = ["a[%d]" % o for o in arg_offsets]
    +            call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    +            mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    --- End diff --
    
    Aha good catch! I was surprised that this doesn't get caught in tests and found out the logic produced the same results unfortunately :(


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20295


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172250093
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ---
    @@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
         val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
         val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
         val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
    -    val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray)
    -    val schema = StructType(child.schema.drop(groupingAttributes.length))
         val sessionLocalTimeZone = conf.sessionLocalTimeZone
         val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
     
    +    // Deduplicate the grouping attributes.
    +    // If a grouping attribute also appears in data attributes, then we don't need to send the
    +    // grouping attribute to Python worker. If a grouping attribute is not in data attributes,
    +    // then we need to send this grouping attribute to python worker.
    +    //
    +    // We use argOffsets to distinguish grouping attributes and data attributes as following:
    +    //
    +    // argOffsets[0] is the length of grouping attributes
    +    // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping attributes
    +    // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data attributes
    +
    +    val dupGroupingIndices = new ArrayBuffer[Int]
    +    val groupingArgOffsets = new ArrayBuffer[Int]
    +    val extraGroupingAttributes = new ArrayBuffer[Attribute]
    +
    +    val dataAttributes = child.output.drop(groupingAttributes.length)
    +    groupingAttributes.foreach { attribute =>
    +      val index = dataAttributes.indexWhere(
    +        childAttribute => attribute.semanticEquals(childAttribute))
    +      dupGroupingIndices += index
    +    }
    --- End diff --
    
    Fixed. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1283/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88049 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88049/testReport)** for PR 20295 at commit [`d51bc2e`](https://github.com/apache/spark/commit/d51bc2e9d0977177e0aff0aeab242376981b6b0d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1328/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88020 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88020/testReport)** for PR 20295 at commit [`c74ed05`](https://github.com/apache/spark/commit/c74ed05dec5c9b2521f5e37cf7e01e5ce873c779).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    @HyukjinKwon @ueshin This is ready for review. I addressed the comments so far.
    
    @BryanCutler yeah I think kwargs is another option. But I think the API in this PR is more consistent with the exsiting APIs though.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    @cloud-fan Currently I sent group columns along with the extra data column. For example, if the original DataFrame has `id, v` and group column is `id`, the current implementation in this PR will send three series `id, id, v` to the python worker and send an `argOffsets` of `[1, 2]` to specify the data columns are `id, v`. The first value of the group column is used as the group key, because values in a group column are equal.
    
    I implemented it this way because it doesn't change the existing serialization protocol. Alternatively, we can implement a new serialization protocol for GROUP_MAP eval type, i.e, instead of sending an arrow batch, we could send a group row and then an arrow batch. What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87480 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87480/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    cc @BryanCutler 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86773 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86773/testReport)** for PR 20295 at commit [`edb77dc`](https://github.com/apache/spark/commit/edb77dc06ba0c675ab2001548d620d5938cd26c0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    gentle ping @HyukjinKwon @ueshin  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    To me, seems roughly fine.
    
    > Alternatively, we can implement a new serialization protocol for GROUP_MAP eval type, i.e, instead of sending an arrow batch, we could send a group row and then an arrow batch.
    
    I don't have a strong preference on this.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1118/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87455 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87455/testReport)** for PR 20295 at commit [`cf45827`](https://github.com/apache/spark/commit/cf458279c4630ed0fb565b6836d10b909185309e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: wip: [SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86286 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86286/testReport)** for PR 20295 at commit [`38195ac`](https://github.com/apache/spark/commit/38195ac7f0b9e1227cfc1e407de47e276b3fc43f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86773 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86773/testReport)** for PR 20295 at commit [`edb77dc`](https://github.com/apache/spark/commit/edb77dc06ba0c675ab2001548d620d5938cd26c0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    @HyukjinKwon Thanks for the comment. I will continue with the current approach unless objection raises. I will work on comments and refinements in the next day or two.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86773/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86837 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86837/testReport)** for PR 20295 at commit [`8f0782c`](https://github.com/apache/spark/commit/8f0782c07f4c6f02610918e6d4edc5907f7d6aaa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86837/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172250020
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ---
    @@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
         val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
         val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
         val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
    -    val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray)
    -    val schema = StructType(child.schema.drop(groupingAttributes.length))
         val sessionLocalTimeZone = conf.sessionLocalTimeZone
         val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
     
    +    // Deduplicate the grouping attributes.
    +    // If a grouping attribute also appears in data attributes, then we don't need to send the
    +    // grouping attribute to Python worker. If a grouping attribute is not in data attributes,
    +    // then we need to send this grouping attribute to python worker.
    +    //
    +    // We use argOffsets to distinguish grouping attributes and data attributes as following:
    +    //
    +    // argOffsets[0] is the length of grouping attributes
    +    // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping attributes
    +    // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data attributes
    +
    +    val dupGroupingIndices = new ArrayBuffer[Int]
    +    val groupingArgOffsets = new ArrayBuffer[Int]
    +    val extraGroupingAttributes = new ArrayBuffer[Attribute]
    +
    +    val dataAttributes = child.output.drop(groupingAttributes.length)
    +    groupingAttributes.foreach { attribute =>
    +      val index = dataAttributes.indexWhere(
    +        childAttribute => attribute.semanticEquals(childAttribute))
    +      dupGroupingIndices += index
    +    }
    +
    +    val extraGroupingSize = dupGroupingIndices.count(_ == -1)
    +    (groupingAttributes zip dupGroupingIndices).foreach {
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Hey @HyukjinKwon @ueshin could you please take a look at this? Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86290 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86290/testReport)** for PR 20295 at commit [`7ce3fa7`](https://github.com/apache/spark/commit/7ce3fa79f8f63d4320c386ed81fa0b57d6bb7a91).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171466325
  
    --- Diff: python/pyspark/sql/types.py ---
    @@ -1725,6 +1737,29 @@ def _get_local_timezone():
         return os.environ.get('TZ', 'dateutil/:')
     
     
    +def _check_series_localize_timestamps(s, timezone):
    +    """
    +    Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone.
    +
    +    If the input series is not a timestamp series, then the same series is returned. If the input
    +    series is a timestamp series, then a converted series is returned.
    +
    +    :param s: pandas.Series
    +    :param timezone: the timezone to convert. if None then use local timezone
    +    :return pandas.Series that have been converted to tz-naive
    +    """
    +    from pyspark.sql.utils import require_minimum_pandas_version
    +    require_minimum_pandas_version()
    +
    +    from pandas.api.types import is_datetime64tz_dtype
    --- End diff --
    
    do we have tests for tese in tests.py?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88020 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88020/testReport)** for PR 20295 at commit [`c74ed05`](https://github.com/apache/spark/commit/c74ed05dec5c9b2521f5e37cf7e01e5ce873c779).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87490/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r214795846
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -4588,6 +4613,80 @@ def test_timestamp_dst(self):
             result = df.groupby('time').apply(foo_udf).sort('time')
             self.assertPandasEqual(df.toPandas(), result.toPandas())
     
    +    def test_udf_with_key(self):
    +        from pyspark.sql.functions import pandas_udf, col, PandasUDFType
    +        df = self.data
    +        pdf = df.toPandas()
    +
    +        def foo1(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=pdf.v * key[0],
    +                              v3=pdf.v * pdf.id,
    +                              v4=pdf.v * pdf.id.mean())
    +
    +        def foo2(key, pdf):
    +            import numpy as np
    +            assert type(key) == tuple
    +            assert type(key[0]) == np.int64
    +            assert type(key[1]) == np.int32
    +
    +            return pdf.assign(v1=key[0],
    +                              v2=key[1],
    +                              v3=pdf.v * key[0],
    +                              v4=pdf.v + key[1])
    +
    +        def foo3(key, pdf):
    +            assert type(key) == tuple
    +            assert len(key) == 0
    +            return pdf.assign(v1=pdf.v * pdf.id)
    +
    +        # v2 is int because numpy.int64 * pd.Series<int32> results in pd.Series<int32>
    +        # v3 is long because pd.Series<int64> * pd.Series<int32> results in pd.Series<int64>
    +        udf1 = pandas_udf(
    +            foo1,
    +            'id long, v int, v1 long, v2 int, v3 long, v4 double',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf2 = pandas_udf(
    +            foo2,
    +            'id long, v int, v1 long, v2 int, v3 int, v4 int',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        udf3 = pandas_udf(
    +            foo3,
    +            'id long, v int, v1 long',
    +            PandasUDFType.GROUPED_MAP)
    +
    +        # Test groupby column
    +        result1 = df.groupby('id').apply(udf1).sort('id', 'v').toPandas()
    +        expected1 = pdf.groupby('id')\
    +            .apply(lambda x: udf1.func((x.id.iloc[0],), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected1, result1)
    +
    +        # Test groupby expression
    +        result2 = df.groupby(df.id % 2).apply(udf1).sort('id', 'v').toPandas()
    +        expected2 = pdf.groupby(pdf.id % 2)\
    +            .apply(lambda x: udf1.func((x.id.iloc[0] % 2,), x))\
    +            .sort_values(['id', 'v']).reset_index(drop=True)
    +        self.assertPandasEqual(expected2, result2)
    +
    +        # Test complex groupby
    +        result3 = df.groupby(df.id, df.v % 2).apply(udf2).sort('id', 'v').toPandas()
    --- End diff --
    
    Any negative test case when the number of columns specified in groupby is different from the definition of udf (foo2)?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88050 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88050/testReport)** for PR 20295 at commit [`4b61f52`](https://github.com/apache/spark/commit/4b61f52497e9f356015ae7905fd08da785e90f08).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88050/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171465908
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    +       ... def mean_udf(key, pdf):
    +       ...     # key is a tuple of one numpy.int64, which is the value
    +       ...     # of 'id' for the current group
    +       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
    +       >>> df.groupby('id').apply(mean_udf).show()  #doctest: +SKIP
    --- End diff --
    
    why skip all of these btw? why not run them so they can be tested?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87455 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87455/testReport)** for PR 20295 at commit [`cf45827`](https://github.com/apache/spark/commit/cf458279c4630ed0fb565b6836d10b909185309e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: wip: [SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86286 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86286/testReport)** for PR 20295 at commit [`38195ac`](https://github.com/apache/spark/commit/38195ac7f0b9e1227cfc1e407de47e276b3fc43f).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Thanks all for review! @HyukjinKwon do you mean this doc?
    https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md#pyspark-usage-guide-for-pandas-with-apache-arrow
    
    I can update it now or we can update later in batch before 2.4 release. What do you prefer?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Resolved conflict and addressed @ueshin's comment. (Btw, I am fine with merging after Spark 2.3 RC passes, as that seems to be the priority now, just want to make sure this PR doesn't sit forever...)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r162912985
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -267,13 +267,13 @@ def load_stream(self, stream):
             """
             Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
             """
    -        from pyspark.sql.types import _check_dataframe_localize_timestamps
    +        from pyspark.sql.types import _check_series_localize_timestamps
             import pyarrow as pa
             reader = pa.open_stream(stream)
             for batch in reader:
                 # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
    -            pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
    -            yield [c for _, c in pdf.iteritems()]
    +            yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone)
    +                   for c in pa.Table.from_batches([batch]).itercolumns()]
    --- End diff --
    
    Maybe we can remove the comment above (`# NOTE: ...`) ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1356/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r164347701
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -54,7 +54,7 @@ def _create_udf(f, returnType, evalType):
                     "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
                 )
     
    -        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1:
    +        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) not in (1, 2):
                 raise ValueError(
                     "Invalid function: pandas_udfs with function type GROUP_MAP "
                     "must take a single arg that is a pandas DataFrame."
    --- End diff --
    
    We should update the error message here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87736/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87490 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87490/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Yup. Maybe we could do that when we are close to 2.4.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    @icexelloss Could you annotate `[SQL][PYTHON]` in the pr title please?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249785
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    +       ... def mean_udf(key, pdf):
    +       ...     # key is a tuple of one numpy.int64, which is the value
    +       ...     # of 'id' for the current group
    +       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
    --- End diff --
    
    Added


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171469275
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    +       ... def mean_udf(key, pdf):
    +       ...     # key is a tuple of one numpy.int64, which is the value
    +       ...     # of 'id' for the current group
    +       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
    +       >>> df.groupby('id').apply(mean_udf).show()  #doctest: +SKIP
    --- End diff --
    
    I think it's because we couldn't find yet a min fix to enable the doctests only when PyArrow and Pandas are installed.
    
    Maybe we can try to drop doctests right before we run `doctest.testmod` below conditionally but it's kind of a new approach to Spark as far as I know.
    
    Will probably take a look for it separately soon. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Addressed all comments and manually tested the example in docstring.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011][SQL][PYTHON] Support alternative fu...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172699274
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -35,24 +37,29 @@ def _wrap_function(sc, func, returnType):
                                       sc.pythonVer, broadcast_vars, sc._javaAccumulator)
     
     
    +def _get_argspec(f):
    --- End diff --
    
    How about putting this in pyspark.util? It might be useful in places other than sql


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171305660
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
             arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    -        udfs['f%d' % i] = udf
    -        args = ["a[%d]" % o for o in arg_offsets]
    -        call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    -    # Create function like this:
    -    #   lambda a: (f0(a0), f1(a1, a2), f2(a3))
    -    # In the special case of a single UDF this will return a single result rather
    -    # than a tuple of results; this is the format that the JVM side expects.
    -    mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    -    mapper = eval(mapper_str, udfs)
    +        udfs['f'] = udf
    +        split_offset = arg_offsets[0] + 1
    +        arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
    +        arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
    +        mapper_str = ("lambda a: f([%s], [%s])" % (", ".join(arg0), ", ".join(arg1)))
    +    else:
    +        # Create function like this:
    +        #   lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
    +        # In the special case of a single UDF this will return a single result rather
    +        # than a tuple of results; this is the format that the JVM side expects.
    +        for i in range(num_udfs):
    +            arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    +            udfs['f%d' % i] = udf
    +            args = ["a[%d]" % o for o in arg_offsets]
    +            call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    +            mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    --- End diff --
    
    This line seems misindented.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86290/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87455/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Hi all,
    
    I did some digging and I think adding a serialization form that serialize a key object along with a Arrow record batch is quite complicated because we are using ArrowStreamReader/Writer for sending batches and send extra key data would have to use a lower level Arrow API for sending/receiving batches.
    
    I did two things to convince myself the current approach is fine:
    * I add logic to de duplicate grouping key they are already in data columns. i.e., if a user calls
    ```
    df.groupby('id').apply(foo_udf)
    ```
    We will not send extra grouping columns because those are already part of data columns. Instead, we will just use the corresponding data column to get grouping key to pass to user function. However, if user calls:
    ```
    df.groupby(df.id % 2).apply(foo_udf)
    ```
    then an extra column `df.id % 2` will be created and sent to python worker. But I think this is an uncommon case.
    
    * I did some benchmark to see the impact of sending extra grouping column. I used a Spark DataFrame of a single column to maximize the effect of the extra grouping column (basically sending extra grouping column will double the data to be sent to python in the benchmark, however in real use cases the effect of sending extra grouping columns should be far less).
    Even with the setting of the benchmark, I have not observed performance diffs when sending extra grouping columns, I think this is because the total time is dominated by other parts of the computation. [micro benchmark](https://gist.github.com/icexelloss/88f6c6fdaf04aac39d68d74cd0942c07)
    
    I'd like to leave the work for more flexible arrow serialization as future work because it doesn't seems to affect performance of this patch and proceed with the current patch based on the two points above. What do you guys think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86651/testReport)** for PR 20295 at commit [`6b8fdbc`](https://github.com/apache/spark/commit/6b8fdbc61ce8ef03a5ac54d65babc29c0697c9c4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87463/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249757
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86834 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86834/testReport)** for PR 20295 at commit [`2668251`](https://github.com/apache/spark/commit/266825167f0bf308c0b4213b1ef718a930a47c2b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/241/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86837 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86837/testReport)** for PR 20295 at commit [`8f0782c`](https://github.com/apache/spark/commit/8f0782c07f4c6f02610918e6d4edc5907f7d6aaa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86651/testReport)** for PR 20295 at commit [`6b8fdbc`](https://github.com/apache/spark/commit/6b8fdbc61ce8ef03a5ac54d65babc29c0697c9c4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    LGTM except for @BryanCutler's suggestion (https://github.com/apache/spark/pull/20295#discussion_r172374978). Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/395/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/203/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88049/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    BTW, let's don't forget to fix the doc later .. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86651/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249706
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    --- End diff --
    
    Changed to "does not"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/396/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86604 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86604/testReport)** for PR 20295 at commit [`c7fccde`](https://github.com/apache/spark/commit/c7fccde701723c8808b095586e987ad02fc171c7).
     * This patch **fails PySpark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171315375
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ---
    @@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
         val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
         val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
         val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
    -    val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray)
    -    val schema = StructType(child.schema.drop(groupingAttributes.length))
         val sessionLocalTimeZone = conf.sessionLocalTimeZone
         val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
     
    +    // Deduplicate the grouping attributes.
    +    // If a grouping attribute also appears in data attributes, then we don't need to send the
    +    // grouping attribute to Python worker. If a grouping attribute is not in data attributes,
    +    // then we need to send this grouping attribute to python worker.
    +    //
    +    // We use argOffsets to distinguish grouping attributes and data attributes as following:
    +    //
    +    // argOffsets[0] is the length of grouping attributes
    +    // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping attributes
    +    // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data attributes
    +
    +    val dupGroupingIndices = new ArrayBuffer[Int]
    +    val groupingArgOffsets = new ArrayBuffer[Int]
    +    val extraGroupingAttributes = new ArrayBuffer[Attribute]
    +
    +    val dataAttributes = child.output.drop(groupingAttributes.length)
    +    groupingAttributes.foreach { attribute =>
    +      val index = dataAttributes.indexWhere(
    +        childAttribute => attribute.semanticEquals(childAttribute))
    +      dupGroupingIndices += index
    +    }
    --- End diff --
    
    nit:
    
    ```scala
    val dupGroupingIndices = groupingAttributes.map { attribute =>
        dataAttributes.indexWhere(attribute.semanticEquals)
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171285307
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    --- End diff --
    
    I usually avoid abbreviation like `doesn't` in doc but I am not sure if this actually matters though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87463 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87463/testReport)** for PR 20295 at commit [`cf45827`](https://github.com/apache/spark/commit/cf458279c4630ed0fb565b6836d10b909185309e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/905/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87480 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87480/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r164483676
  
    --- Diff: python/pyspark/sql/udf.py ---
    @@ -54,7 +54,7 @@ def _create_udf(f, returnType, evalType):
                     "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
                 )
     
    -        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1:
    +        if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) not in (1, 2):
                 raise ValueError(
                     "Invalid function: pandas_udfs with function type GROUP_MAP "
                     "must take a single arg that is a pandas DataFrame."
    --- End diff --
    
    Aha good catch. Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Sounds good. Let's track in https://issues.apache.org/jira/browse/SPARK-23633


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88050 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88050/testReport)** for PR 20295 at commit [`4b61f52`](https://github.com/apache/spark/commit/4b61f52497e9f356015ae7905fd08da785e90f08).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1332/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/924/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172249841
  
    --- Diff: python/pyspark/worker.py ---
    @@ -149,18 +156,30 @@ def read_udfs(pickleSer, infile, eval_type):
         num_udfs = read_int(infile)
         udfs = {}
         call_udf = []
    -    for i in range(num_udfs):
    +    mapper_str = ""
    +    if eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    +        # Create function like this:
    +        #   lambda a: f([a[0]], [a[0], a[1]])
    +        assert num_udfs == 1
             arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    -        udfs['f%d' % i] = udf
    -        args = ["a[%d]" % o for o in arg_offsets]
    -        call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    -    # Create function like this:
    -    #   lambda a: (f0(a0), f1(a1, a2), f2(a3))
    -    # In the special case of a single UDF this will return a single result rather
    -    # than a tuple of results; this is the format that the JVM side expects.
    -    mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    -    mapper = eval(mapper_str, udfs)
    +        udfs['f'] = udf
    +        split_offset = arg_offsets[0] + 1
    +        arg0 = ["a[%d]" % o for o in arg_offsets[1: split_offset]]
    +        arg1 = ["a[%d]" % o for o in arg_offsets[split_offset:]]
    +        mapper_str = ("lambda a: f([%s], [%s])" % (", ".join(arg0), ", ".join(arg1)))
    +    else:
    +        # Create function like this:
    +        #   lambda a: (f0(a[0]), f1(a[1], a[2]), f2(a[3]))
    +        # In the special case of a single UDF this will return a single result rather
    +        # than a tuple of results; this is the format that the JVM side expects.
    +        for i in range(num_udfs):
    +            arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type)
    +            udfs['f%d' % i] = udf
    +            args = ["a[%d]" % o for o in arg_offsets]
    +            call_udf.append("f%d(%s)" % (i, ", ".join(args)))
    +            mapper_str = "lambda a: (%s)" % (", ".join(call_udf))
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [WIP][SPARK-23011] Support alternative function f...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r163904221
  
    --- Diff: python/pyspark/serializers.py ---
    @@ -267,13 +267,13 @@ def load_stream(self, stream):
             """
             Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series.
             """
    -        from pyspark.sql.types import _check_dataframe_localize_timestamps
    +        from pyspark.sql.types import _check_series_localize_timestamps
             import pyarrow as pa
             reader = pa.open_stream(stream)
             for batch in reader:
                 # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1
    -            pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone)
    -            yield [c for _, c in pdf.iteritems()]
    +            yield [_check_series_localize_timestamps(c.to_pandas(), self._timezone)
    +                   for c in pa.Table.from_batches([batch]).itercolumns()]
    --- End diff --
    
    @BryanCutler Thanks for the clarification. I removed the note.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    How are you going to send the group columns? For a group we have only one group row and a bunch of data rows.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172583099
  
    --- Diff: python/pyspark/worker.py ---
    @@ -91,10 +92,16 @@ def verify_result_length(*a):
     
     
     def wrap_grouped_map_pandas_udf(f, return_type):
    -    def wrapped(*series):
    +    def wrapped(key_series, value_series):
             import pandas as pd
    +        argspec = inspect.getargspec(f)
    --- End diff --
    
    Good point. Let me take a look at that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88049 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88049/testReport)** for PR 20295 at commit [`d51bc2e`](https://github.com/apache/spark/commit/d51bc2e9d0977177e0aff0aeab242376981b6b0d).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1354/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/205/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/641/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86606 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86606/testReport)** for PR 20295 at commit [`cda97f1`](https://github.com/apache/spark/commit/cda97f142285bc01a2da96765d3703d37b4cb802).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #88048 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88048/testReport)** for PR 20295 at commit [`59bdf20`](https://github.com/apache/spark/commit/59bdf200c6b922d18a64db0c7a67f9ec069c67ba).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/206/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #87736 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87736/testReport)** for PR 20295 at commit [`9ed3779`](https://github.com/apache/spark/commit/9ed3779b665c90e5bb25bc6636997a4b080c3d34).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87480/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Will merge this one if there's no more comments or not merged within few days.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1355/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: wip: [SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011][SQL][PYTHON] Support alternative function ...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87129/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: wip: [SPARK-23011] Support alternative function form wit...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86286/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [WIP][SPARK-23011] Support alternative function form wit...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    **[Test build #86290 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86290/testReport)** for PR 20295 at commit [`7ce3fa7`](https://github.com/apache/spark/commit/7ce3fa79f8f63d4320c386ed81fa0b57d6bb7a91).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20295: [SPARK-23011] Support alternative function form w...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r171297236
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -2253,6 +2253,30 @@ def pandas_udf(f=None, returnType=None, functionType=None):
            |  2| 1.1094003924504583|
            +---+-------------------+
     
    +       Alternatively, the user can define a function that takes two arguments.
    +       In this case, the grouping key will be passed as the first argument and the data will
    +       be passed as the second argument. The grouping key will be passed as a tuple of numpy
    +       data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in
    +       as a `pandas.DataFrame` containing all columns from the original Spark DataFrame.
    +       This is useful when the user doesn't want to hardcode grouping key in the function.
    +
    +       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
    +       >>> df = spark.createDataFrame(
    +       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    +       ...     ("id", "v"))  # doctest: +SKIP
    +       >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)  # doctest: +SKIP
    +       ... def mean_udf(key, pdf):
    +       ...     # key is a tuple of one numpy.int64, which is the value
    +       ...     # of 'id' for the current group
    +       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
    --- End diff --
    
    nit: `import pandas as pd`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Don't worry, I am keeping my eyes on this and I believe @ueshin too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20295: [SPARK-23011] Support alternative function form with gro...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/340/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org