You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/29 20:46:45 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

grundprinzip opened a new pull request, #38841:
URL: https://github.com/apache/spark/pull/38841

   ### What changes were proposed in this pull request?
   Previously, the `avg` function was missing in the `GroupedData` class. This patch adds this method and the necessary plan transformation using an unresolved function. In addition, it identified a small issue where when an alias is used for a grouping column, the planner would incorrectly try to wrap the existing alias expression using an unresolved alias which would then fail.
   
   ```
   df = (
       self.connect.range(10)
       .groupBy((col("id") % lit(2)).alias("moded"))
       .avg("id")
       .sort("moded")
   )
   ```
   
   
   ### Why are the changes needed?
   Bug / Compatibility
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035498320


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -722,6 +722,19 @@ def test_write_operations(self):
         ndf = self.connect.read.table("parquet_test")
         self.assertEqual(set(df.collect()), set(ndf.collect()))
 
+    def test_agg_with_avg(self):
+        """SPARK-41325: groupby.avg()"""
+        df = (
+            self.connect.range(10)
+            .groupBy((col("id") % lit(2)).alias("moded"))

Review Comment:
   DataFrame API creates resolved `Alias` as well. I'm not sure why `UnresolvedAlias(Alias...)` is a problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035694342


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -722,6 +722,19 @@ def test_write_operations(self):
         ndf = self.connect.read.table("parquet_test")
         self.assertEqual(set(df.collect()), set(ndf.collect()))
 
+    def test_agg_with_avg(self):
+        """SPARK-41325: groupby.avg()"""

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035294468


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -651,6 +651,7 @@ class SparkConnectPlanner(session: SparkSession) {
         .map(transformExpression)
         .map {
           case x @ UnresolvedAttribute(_) => x
+          case x @ Alias(_, _) => x

Review Comment:
   I think it would also make sense to add a test on scala side given this touches server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035422701


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -722,6 +722,19 @@ def test_write_operations(self):
         ndf = self.connect.read.table("parquet_test")
         self.assertEqual(set(df.collect()), set(ndf.collect()))
 
+    def test_agg_with_avg(self):
+        """SPARK-41325: groupby.avg()"""

Review Comment:
   ```suggestion
           # SPARK-41325: groupby.avg()
   ```
   
   per [contribution guide ](https://spark.apache.org/contributing.html).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
zhengruifeng closed pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF
URL: https://github.com/apache/spark/pull/38841


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331701068

   Thank you. I'll address the merge conflicts next. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035419875


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -651,6 +651,7 @@ class SparkConnectPlanner(session: SparkSession) {
         .map(transformExpression)
         .map {
           case x @ UnresolvedAttribute(_) => x
+          case x @ Alias(_, _) => x

Review Comment:
   ```suggestion
             case a: UnresolvedAttribute | Alias => a
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331425929

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331575966

   this pr itself lgtm, but there are behavior changes in `groupby.{min, max, sum, avg, mean}`:
   
   1, pyspark and scala api only accept string `*str` / `string*`
   2, pyspark and scala api will check the schema, if the datatype is unexpected, it fails;
   3, if the no input column, it will check the schema and select all the numeric columns.
   
   
   shall we add a dedicate proto for `groupby.{min, max, sum, avg, mean}`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331648955

   
   > 1, pyspark and scala api only accept string `*str` / `string*`
   
   @zhengruifeng can you elaborate? I tested the same code in PySpark and it works as well. 
   
   
   
   > 2, pyspark and scala api will check the schema, if the datatype is unexpected, it fails;
   
   What do you mean? 
   
   
   > 
   > 3, if the no input column, it will check the schema and select all the numeric columns.
   
   This is more missing functionality than this particular bug correct?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035693746


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -651,6 +651,7 @@ class SparkConnectPlanner(session: SparkSession) {
         .map(transformExpression)
         .map {
           case x @ UnresolvedAttribute(_) => x
+          case x @ Alias(_, _) => x

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331700529

   this inconsistency was not introduced by this PR, so it LGTM.
   
   I think we can merge it first, I create https://issues.apache.org/jira/browse/SPARK-41333 to track this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331982598

   merged into master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035422454


##########
python/pyspark/sql/tests/connect/test_connect_basic.py:
##########
@@ -722,6 +722,19 @@ def test_write_operations(self):
         ndf = self.connect.read.table("parquet_test")
         self.assertEqual(set(df.collect()), set(ndf.collect()))
 
+    def test_agg_with_avg(self):
+        """SPARK-41325: groupby.avg()"""
+        df = (
+            self.connect.range(10)
+            .groupBy((col("id") % lit(2)).alias("moded"))

Review Comment:
   Hm, actually shouldn't we always use unresolved alias? Seems like the alias here is resolved (see https://github.com/apache/spark/commit/0f7eaeee6445aaf05310229bdec22f6953113f2e) cc @cloud-fan .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035307889


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -651,6 +651,7 @@ class SparkConnectPlanner(session: SparkSession) {
         .map(transformExpression)
         .map {
           case x @ UnresolvedAttribute(_) => x
+          case x @ Alias(_, _) => x

Review Comment:
   I'd like to do that as a follow up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38841:
URL: https://github.com/apache/spark/pull/38841#discussion_r1035421186


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -651,6 +651,7 @@ class SparkConnectPlanner(session: SparkSession) {
         .map(transformExpression)
         .map {
           case x @ UnresolvedAttribute(_) => x
+          case x @ Alias(_, _) => x

Review Comment:
   ```suggestion
             case ua: UnresolvedAttribute => ua
             case a: Alias => a
   ```
   
   per https://github.com/databricks/scala-style-guide#pattern-matching



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38841: [SPARK-41325] [CONNECT] Fix missing avg() for GroupBy on DF

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38841:
URL: https://github.com/apache/spark/pull/38841#issuecomment-1331687981

   > > 1, pyspark and scala api only accept string `*str` / `string*`
   > 
   > @zhengruifeng can you elaborate? I tested the same code in PySpark and it works as well.
   > 
   > > 2, pyspark and scala api will check the schema, if the datatype is unexpected, it fails;
   > 
   > What do you mean?
   > 
   > > 3, if the no input column, it will check the schema and select all the numeric columns.
   > 
   > This is more missing functionality than this particular bug correct?
   
   1, pyspark and scala don't take expression or column as input:
   
   ```
   In [11]: df = spark.createDataFrame([(10, 80, "Alice"), (5, None, "Bob"), (None, 10, "Tom"), (None, None, None)], schema=["age", "height", "name"])
   
   In [12]: df.show()
   +----+------+-----+
   | age|height| name|
   +----+------+-----+
   |  10|    80|Alice|
   |   5|  null|  Bob|
   |null|    10|  Tom|
   |null|  null| null|
   +----+------+-----+
   
   
   In [13]: df.groupBy("age").min(df.height)
   ---------------------------------------------------------------------------
   TypeError                                 Traceback (most recent call last)
   Cell In[13], line 1
   ----> 1 df.groupBy("age").min(df.height)
   
   File ~/Dev/spark/python/pyspark/sql/group.py:49, in df_varargs_api.<locals>._api(self, *cols)
        47 def _api(self: "GroupedData", *cols: str) -> DataFrame:
        48     name = f.__name__
   ---> 49     jdf = getattr(self._jgd, name)(_to_seq(self.session._sc, cols))
        50     return DataFrame(jdf, self.session)
   
   ...
   
   TypeError: Column is not iterable
   
   ```
   
   2, if input columns contains non-acceptable datatypes, it fails like
   
   ```
   In [14]: df.groupBy("age").min("name")
   ---------------------------------------------------------------------------
   AnalysisException                         Traceback (most recent call last)
   Cell In[14], line 1
   ----> 1 df.groupBy("age").min("name")
   
   File ~/Dev/spark/python/pyspark/sql/group.py:49, in df_varargs_api.<locals>._api(self, *cols)
        47 def _api(self: "GroupedData", *cols: str) -> DataFrame:
        48     name = f.__name__
   ---> 49     jdf = getattr(self._jgd, name)(_to_seq(self.session._sc, cols))
        50     return DataFrame(jdf, self.session)
   
   File ~/Dev/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
      1316 command = proto.CALL_COMMAND_NAME +\
      1317     self.command_header +\
      1318     args_command +\
      1319     proto.END_COMMAND_PART
      1321 answer = self.gateway_client.send_command(command)
   -> 1322 return_value = get_return_value(
      1323     answer, self.gateway_client, self.target_id, self.name)
      1325 for temp_arg in temp_args:
      1326     if hasattr(temp_arg, "_detach"):
   
   File ~/Dev/spark/python/pyspark/sql/utils.py:205, in capture_sql_exception.<locals>.deco(*a, **kw)
       201 converted = convert_exception(e.java_exception)
       202 if not isinstance(converted, UnknownException):
       203     # Hide where the exception came from that shows a non-Pythonic
       204     # JVM exception message.
   --> 205     raise converted from None
       206 else:
       207     raise
   
   AnalysisException: "name" is not a numeric column. Aggregation function can only be applied on a numeric column.
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org