You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "amaliujia (via GitHub)" <gi...@apache.org> on 2023/02/23 20:57:31 UTC

[GitHub] [spark] amaliujia opened a new pull request, #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

amaliujia opened a new pull request, #40145:
URL: https://github.com/apache/spark/pull/40145

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Support Pivot with provided pivot column values. Not supporting Pivot without providing column values because that requires to do max value check which depends on the implementation of Spark configuration in Spark Connect.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   API coverage
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   NO
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40145:
URL: https://github.com/apache/spark/pull/40145#issuecomment-1444946241

   Merging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116602153


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   Fixed. Because this is internal API I switched to use `assert` because there shouldn't a empty pivot proto which is a bug that we need to fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116493586


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   Well we messed up. I guess IllegalArgumentException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116475547


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   Basically what is the proper exception we should use for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on PR #40145:
URL: https://github.com/apache/spark/pull/40145#issuecomment-1442422396

   @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116885074


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
+          assert(pivot.isEmpty)

Review Comment:
   `pivot.isDefined`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116493326


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -234,4 +239,132 @@ class RelationalGroupedDataset protected[sql] (
   def sum(colNames: String*): DataFrame = {
     toDF(colNames.map(colName => functions.sum(colName)))
   }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. There are
+   * two versions of pivot function: one that requires the caller to specify the list of distinct
+   * values to pivot on, and one that does not. The latter is more concise but less efficient,
+   * because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings")
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * From Spark 3.0.0, values can be literal columns, for instance, struct. For pivoting by
+   * multiple columns, use the `struct` function to combine the columns and values:
+   *
+   * {{{
+   *   df.groupBy("year")
+   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+   *     .agg(sum($"earnings"))
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified
+   * aggregation.
+   *
+   * There are two versions of pivot function: one that requires the caller to specify the list of
+   * distinct values to pivot on, and one that does not. The latter is more concise but less
+   * efficient, because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", "Java")).sum("earnings");
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings");
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. This is an
+   * overloaded version of the `pivot` method with `pivotColumn` of the `String` type.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings")
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {
+    groupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
+        val valueExprs = values.map(_ match {
+          case c: Column if c.expr.hasLiteral => c.expr.getLiteral
+          case v => functions.lit(v).expr.getLiteral

Review Comment:
   I would throw an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116475340


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   So do you think if throw `UnsupportedOperationException` is ok for now? I cannot find a better way to re-use existing SQL errors at this moment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1117335432


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT =>
+          assert(pivot.isEmpty)

Review Comment:
   oops yes. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values
URL: https://github.com/apache/spark/pull/40145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116471285


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   Please add tests for all errors thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116476231


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -234,4 +239,132 @@ class RelationalGroupedDataset protected[sql] (
   def sum(colNames: String*): DataFrame = {
     toDF(colNames.map(colName => functions.sum(colName)))
   }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. There are
+   * two versions of pivot function: one that requires the caller to specify the list of distinct
+   * values to pivot on, and one that does not. The latter is more concise but less efficient,
+   * because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings")
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * From Spark 3.0.0, values can be literal columns, for instance, struct. For pivoting by
+   * multiple columns, use the `struct` function to combine the columns and values:
+   *
+   * {{{
+   *   df.groupBy("year")
+   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+   *     .agg(sum($"earnings"))
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified
+   * aggregation.
+   *
+   * There are two versions of pivot function: one that requires the caller to specify the list of
+   * distinct values to pivot on, and one that does not. The latter is more concise but less
+   * efficient, because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", "Java")).sum("earnings");
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings");
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. This is an
+   * overloaded version of the `pivot` method with `pivotColumn` of the `String` type.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings")
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {
+    groupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
+        val valueExprs = values.map(_ match {
+          case c: Column if c.expr.hasLiteral => c.expr.getLiteral
+          case v => functions.lit(v).expr.getLiteral

Review Comment:
   Nice catch. `v` here actually should only refer to non-column case.
   
   I think I was not certain how to handle when it is a column but not contains a literal. Any idea how to handle that case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116475340


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -47,14 +48,18 @@ class RelationalGroupedDataset protected[sql] (
         .addAllGroupingExpressions(groupingExprs.asJava)
         .addAllAggregateExpressions(aggExprs.map(e => e.expr).asJava)
 
-      // TODO: support Pivot.
       groupType match {
         case proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_ROLLUP)
         case proto.Aggregate.GroupType.GROUP_TYPE_CUBE =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_CUBE)
         case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
           builder.getAggregateBuilder.setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
+        case proto.Aggregate.GroupType.GROUP_TYPE_PIVOT if pivot.isDefined =>
+          builder.getAggregateBuilder
+            .setGroupType(proto.Aggregate.GroupType.GROUP_TYPE_PIVOT)
+            .setPivot(pivot.get)
+        // TODO: throw proper error message for PIVOT when pivot proto is not defined.

Review Comment:
   So do you think if throw `UnsupportedException` is ok for now? I cannot find a better way to re-use existing SQL errors at this moment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116470992


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -234,4 +239,132 @@ class RelationalGroupedDataset protected[sql] (
   def sum(colNames: String*): DataFrame = {
     toDF(colNames.map(colName => functions.sum(colName)))
   }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. There are
+   * two versions of pivot function: one that requires the caller to specify the list of distinct
+   * values to pivot on, and one that does not. The latter is more concise but less efficient,
+   * because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings")
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * From Spark 3.0.0, values can be literal columns, for instance, struct. For pivoting by
+   * multiple columns, use the `struct` function to combine the columns and values:
+   *
+   * {{{
+   *   df.groupBy("year")
+   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+   *     .agg(sum($"earnings"))
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified
+   * aggregation.
+   *
+   * There are two versions of pivot function: one that requires the caller to specify the list of
+   * distinct values to pivot on, and one that does not. The latter is more concise but less
+   * efficient, because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", "Java")).sum("earnings");
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings");
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. This is an
+   * overloaded version of the `pivot` method with `pivotColumn` of the `String` type.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings")
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {
+    groupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
+        val valueExprs = values.map(_ match {
+          case c: Column if c.expr.hasLiteral => c.expr.getLiteral
+          case v => functions.lit(v).expr.getLiteral

Review Comment:
   If `v` is a `Column` (which can happen because of the guard in the previous case statement) then this will produce an empty literal. The same applies to a `scala.Symbol`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40145: [SPARK-42541][CONNECT] Support Pivot with provided pivot column values

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40145:
URL: https://github.com/apache/spark/pull/40145#discussion_r1116603441


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -234,4 +239,132 @@ class RelationalGroupedDataset protected[sql] (
   def sum(colNames: String*): DataFrame = {
     toDF(colNames.map(colName => functions.sum(colName)))
   }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. There are
+   * two versions of pivot function: one that requires the caller to specify the list of distinct
+   * values to pivot on, and one that does not. The latter is more concise but less efficient,
+   * because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Seq("dotNET", "Java")).sum("earnings")
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings")
+   * }}}
+   *
+   * From Spark 3.0.0, values can be literal columns, for instance, struct. For pivoting by
+   * multiple columns, use the `struct` function to combine the columns and values:
+   *
+   * {{{
+   *   df.groupBy("year")
+   *     .pivot("trainingCourse", Seq(struct(lit("java"), lit("Experts"))))
+   *     .agg(sum($"earnings"))
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: Seq[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * (Java-specific) Pivots a column of the current `DataFrame` and performs the specified
+   * aggregation.
+   *
+   * There are two versions of pivot function: one that requires the caller to specify the list of
+   * distinct values to pivot on, and one that does not. The latter is more concise but less
+   * efficient, because Spark needs to first compute the list of distinct values internally.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy("year").pivot("course", Arrays.<Object>asList("dotNET", "Java")).sum("earnings");
+   *
+   *   // Or without specifying column values (less efficient)
+   *   df.groupBy("year").pivot("course").sum("earnings");
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   Name of the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = {
+    pivot(Column(pivotColumn), values)
+  }
+
+  /**
+   * Pivots a column of the current `DataFrame` and performs the specified aggregation. This is an
+   * overloaded version of the `pivot` method with `pivotColumn` of the `String` type.
+   *
+   * {{{
+   *   // Compute the sum of earnings for each year by course with each course as a separate column
+   *   df.groupBy($"year").pivot($"course", Seq("dotNET", "Java")).sum($"earnings")
+   * }}}
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot` for the reverse operation, except for the
+   *   aggregation.
+   *
+   * @param pivotColumn
+   *   the column to pivot.
+   * @param values
+   *   List of values that will be translated to columns in the output DataFrame.
+   * @since 3.4.0
+   */
+  def pivot(pivotColumn: Column, values: Seq[Any]): RelationalGroupedDataset = {
+    groupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY =>
+        val valueExprs = values.map(_ match {
+          case c: Column if c.expr.hasLiteral => c.expr.getLiteral
+          case v => functions.lit(v).expr.getLiteral

Review Comment:
   done with a test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org