You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org> on 2023/02/06 10:24:14 UTC

[GitHub] [spark] santosh-d3vpl3x opened a new pull request, #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

santosh-d3vpl3x opened a new pull request, #39902:
URL: https://github.com/apache/spark/pull/39902

   Pandas cogroup UDF with applyInPandas currently support two dataframes. This is already very useful but limits us both API wise and efficiency wise when we have to use multiple DFs with cogroup.applyInPandas. The PR here is to support multiple DFs in cogroup.
   
   ```python
   df1 = spark.createDataFrame(
       [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
       ("time", "id", "v1"))
   df2 = spark.createDataFrame(
       [(20000101, 1, "x"), (20000101, 2, "y")],
       ("time", "id", "v2"))
   df3 = spark.createDataFrame(
       [(20000101, 1, "asd"), (20000101, 2, "d")],
       ("time", "id", "v3"))
   df4 = spark.createDataFrame(
       [(20000101, 1, "v"), (20000101, 2, "g")],
       ("time", "id", "v4"))
   def asof_join(df1, df2, df3, df4):
       df12 = pd.merge_asof(df1, df2, on="time", by="id")
       df123 = pd.merge_asof(df12, df3, on="time", by="id")
       df1234 = pd.merge_asof(df123, df4, on="time", by="id")
       return df1234
   df1.groupby("id").cogroup(df2.groupby("id"), df3.groupby("id"), df4.groupby("id")).applyInPandas(
       asof_join, schema="time int, id int, v1 double, v2 string, v3 string, v4 string"
   ).show()
   
   +--------+---+---+---+---+---+
   |    time| id| v1| v2| v3| v4|
   +--------+---+---+---+---+---+
   |20000101|  1|1.0|  x|asd|  v|
   |20000102|  1|3.0|  x|asd|  v|
   |20000101|  2|2.0|  y|  d|  g|
   |20000102|  2|4.0|  y|  d|  g|
   +--------+---+---+---+---+---+
   ```
   
   Note: The previous experimental implementation of the API expects either 2 or 3 args. If 2 args are passed then UDF receives 2 cogrouped DFs, if 3 args are passed then UDF receives 2 cogrouped DFs with grouping key as the first arg. The previous API is limiting and has implications on new proposed change. There is no clear way of distinguishing whether 3 args are for 3 DFs or 2 DFs + 1 grouping key. I have kept previous implementation and codepath intact but I would like to get rid of it. I would like to make `pass_key` as a way of indicating whether grouping key should be passed as first arg to UDF or not. Question: Can I remove previous implementation with this new idea?
   
   ![image](https://user-images.githubusercontent.com/3813695/216947300-f1876572-d934-42b6-8003-8df0bde917bc.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass var-args in UDF.
   
   I would like to have
   - capability to pass var-args as input in UDFs
   - explicitness if we don't follow your last suggestion to always have first arg as key
   
   I am waiting for more opinions in this regard. The last experimental implementation from v3.3.1 is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can. If we can make this choice then I will have a way to reduce the duplication in the code following single codepath for all number of arguments :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-42349][PYTHON] Support pandas cogroup with multiple df [spark]

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #39902:
URL: https://github.com/apache/spark/pull/39902#issuecomment-1836281160

   @HyukjinKwon @cloud-fan @sunchao @zhengruifeng any chance this gets some attention?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass var-args in UDF.
   
   I would like to have
   - capability to pass var-args as input in UDFs
   - explicitness if we don't follow your last suggestion
   
   I am waiting for more opinions in this regard. The last experimental implementation from v3.3.1 is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #39902:
URL: https://github.com/apache/spark/pull/39902#issuecomment-1420377190

   Excellent work. I would strongly recommend two things:
   - lets make existing CoGroup code handle many dataframes, this way lots of code does not get duplicated
   - lets always expect the first argument of the UDF to be the key, things simplify that way and there is not much overhead of always providing the key
   
   But let's first hear whether Spark committers are happy to approve either.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass var-args in UDF.
   
   I would like to have
   - capability to pass var-args as input in UDFs
   - explicitness if we don't follow your last suggestion
   
   I am waiting for more opinions in this regard. The last experimental implementation is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1103004299


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   With "var-args" you mean
   
   ```python
   def func(*pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   ```python
   def func_with_key(key, *pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   
   Then, `len(argspec.args)` will be `0`  for `func` and `1` for `func_with_key` and `argspec.args` will not be `None` in both cases. So restricting var-args cases to above signatures (not allowing `def func(pdf: pd.DataFrame, *pdfs: pd.DataFrame) -> pd.DataFrame`) should make `pass_key` redundant.
   
   Can you give an example what you mean with "explicitness"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on PR #39902:
URL: https://github.com/apache/spark/pull/39902#issuecomment-1420537524

   I like the idea of always making key as the first arg. 
   
   It is essential to answer the question of how key should be dealt with before we can really think about whether code duplication is worth it or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on PR #39902:
URL: https://github.com/apache/spark/pull/39902#issuecomment-1425817737

   cc @zhengruifeng @BryanCutler @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass vargs in UDF.
   
   I would like to have
   - capability to pass varargs as input in UDFs
   - explicitness if we don't follow your last suggestion.
   
   I am waiting for more opinions in this regard. The last experimental implementation is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass vargs in UDF.
   
   I would like to have
   - capability to pass varargs as input in UDFs
   - explicitness if we don't follow your last suggestion
   
   I am waiting for more opinions in this regard. The last experimental implementation is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1103146476


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   ```python
   def func(key, *pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   I would really like to make this as a default then we do not need any **implicit assumption** around whether the first arg is key and when it is not.
   
   If replacing the existing way is not an option then I would like to keep `pass_key` as **explicit** way for user to instruct spark to treat first arg as key. Hopefully that clears up the meaning of explicitness of the API that I have in my mind. 
   
   > Then, len(argspec.args) will be 0 for func and 1 for func_with_key and argspec.args will not be None in both cases. So restricting var-args cases to above signatures (not allowing def func(pdf: pd.DataFrame, *pdfs: pd.DataFrame) -> pd.DataFrame) should make pass_key redundant.
   
   Seems quite complicated and not very friendly for users [higher cognitive load].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #39902: [SPARK-42349][PYTHON] Support pandas cogroup with multiple df

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #39902:
URL: https://github.com/apache/spark/pull/39902#issuecomment-1565757025

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass vargs in UDF.
   
   I would like to have
   - capability to pass varargs as input.
   - explicitness
   
   I am waiting for more opinions in this regard. The last experimental implementation is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1103146476


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   ```python
   def func(key, *pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   I would really like to make this as a default then we do not need any **implicit assumption** around when the first arg is key and when it is not.
   
   If replacing the existing way is not an option then I would like to keep `pass_key` as **explicit** way for user to instruct spark to treat first arg as key. Hopefully that clears up the meaning of explicitness of the API that I have in my mind. 
   
   > Then, len(argspec.args) will be 0 for func and 1 for func_with_key and argspec.args will not be None in both cases. So restricting var-args cases to above signatures (not allowing def func(pdf: pd.DataFrame, *pdfs: pd.DataFrame) -> pd.DataFrame) should make pass_key redundant.
   
   Seems quite complicated and not very friendly for users [higher cognitive load].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1103146476


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   ```python
   def func(key, *pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   I would really like to make this as a default then we do not need any **implicit assumption** around when the first arg is key and when it is not.
   
   If that isn't the case then I would like to keep `pass_key` as **explicit** way for user to instruct spark to treat first arg as key. Hopefully that clears up the meaning of explicitness of the API that I have in my mind. 
   
   > Then, len(argspec.args) will be 0 for func and 1 for func_with_key and argspec.args will not be None in both cases. So restricting var-args cases to above signatures (not allowing def func(pdf: pd.DataFrame, *pdfs: pd.DataFrame) -> pd.DataFrame) should make pass_key redundant.
   
   Seems quite complicated and not very friendly for users [higher cognitive load].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102777270


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   I think `pass_key` is not needed here, as we have `dfs` and `argspec.args`.
   
   When `len(argspec.args) == len(dfs)`, no key is expected, when `len(argspec.args) == len(dfs) + 1`, the key is expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] santosh-d3vpl3x commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "santosh-d3vpl3x (via GitHub)" <gi...@apache.org>.
santosh-d3vpl3x commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1102879097


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   Yes indeed, if we don't allow users to pass var-args in UDF.
   
   I would like to have
   - capability to pass var-args as input in UDFs
   - explicitness if we don't follow your last suggestion to always have first arg as key
   
   I am waiting for more opinions in this regard. The last experimental implementation from v3.3.1 is somewhat soft blocking this PR and I would prefer to go with your last suggestion that we always have key as first arg if we can.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] EnricoMi commented on a diff in pull request #39902: [SPARK-42349][PYTHON]Support pandas cogroup with multiple df

Posted by "EnricoMi (via GitHub)" <gi...@apache.org>.
EnricoMi commented on code in PR #39902:
URL: https://github.com/apache/spark/pull/39902#discussion_r1103004299


##########
python/pyspark/worker.py:
##########
@@ -208,6 +208,41 @@ def wrapped(left_key_series, left_value_series, right_key_series, right_value_se
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))]
 
 
+def wrap_multi_cogrouped_map_pandas_udf(f, return_type, runner_conf, argspec):
+    def wrapped(key_series, value_series):
+        import pandas as pd
+
+        dfs = [pd.concat(series, axis=1) for series in value_series]
+
+        if runner_conf.get("pass_key") == "true":

Review Comment:
   With "var-args" you mean
   
   ```python
   def func(*pdfs: pd.DataFrame) -> pd.DataFrame
   def func_with_key(key, *pdfs: pd.DataFrame) -> pd.DataFrame
   ```
   
   Then, `len(argspec.args)` will be `0`  for `func` and `1` for `func_with_key` and `argspec.args` will not be `None` in both cases. So restricting var-args cases to above signatures (not allowing `def func(pdf: pd.DataFrame, *pdfs: pd.DataFrame) -> pd.DataFrame`) should make `pass_key` redundant.
   
   Can you give an example what you mean with "explicitness"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #39902: [SPARK-42349][PYTHON] Support pandas cogroup with multiple df

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #39902: [SPARK-42349][PYTHON] Support pandas cogroup with multiple df
URL: https://github.com/apache/spark/pull/39902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org