You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhenlineo (via GitHub)" <gi...@apache.org> on 2023/04/14 19:39:43 UTC

[GitHub] [spark] zhenlineo opened a new pull request, #40796: [WIP]Typed agg functions

zhenlineo opened a new pull request, #40796:
URL: https://github.com/apache/spark/pull/40796

   This PR builds on top of https://github.com/apache/spark/pull/40729
   Read the last commit in this PR for the agg func changes.
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202132810


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   @zhenlineo @hvanhovell @amaliujia I found an interesting thing about these two new cases, 'SimpleSparkConnectService' will be submitted as `local [*]`, and when the number of cores on the machine running the case is greater than 10, these two cases will failed as follows:
   
   ```
   Warning: Unable to serialize throwable of type io.grpc.StatusRuntimeException for TestFailed(Ordinal(0, 271),INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...,UserDefinedFunctionE2ETestSuite,org.apache.spark.sql.UserDefinedFunctionE2ETestSuite,Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),Dataset reduce,Dataset reduce,Vector(),Vector(),Some(io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...),Some(290),Some(IndentedText(- Dataset reduce,Dataset reduce,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),None,pool-1-thread-1-ScalaTest-running-UserDefinedFunctionE2ETestSuite,1684472246665), setting it as NotSerializableWrapperException.
   [info] - Dataset reduce *** FAILED *** (290 milliseconds)
   [info]   io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   [info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   [info] 	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   [info] 	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   [info] 	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   [info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   [info] 	at org.apache.spark.scheduler.Task.run(Task.sca...
   [info]   at io.grpc.Status.asRuntimeException(Status.java:535)
   [info]   at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:62)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:114)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:131)
   [info]   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2744)
   [info]   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3184)
   [info]   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2743)
   [info]   at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1292)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$34(UserDefinedFunctionE2ETestSuite.scala:212)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   
   I make a refactor work for these 2 case as https://github.com/apache/spark/pull/41227/files to make the problem reproducible by GA
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202559551


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   Thanks for report this error. I have the fix here https://github.com/apache/spark/pull/41264



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184086351


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named
+    })
+    val keyColumn = logical.UntypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
+    logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
+  }
+
   private def transformAggregate(rel: proto.Aggregate): LogicalPlan = {
+    rel.getGroupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBYKEY =>

Review Comment:
   To mark the input is `DummyUDF + groupingExprs` rather than `groupingExprs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [WIP]Typed agg functions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1169172608


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,7 +59,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
           Utils.redact(session.sessionState.conf.stringRedactionPattern, v.toString)
         session.sparkContext.setLocalProperty(
           "callSite.short",
-          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+          s"Spark Connect - session_id: '${v.getSessionId}'")

Review Comment:
   Unrelated change, this just make the logging clearly to debug.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,7 +59,7 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
           Utils.redact(session.sessionState.conf.stringRedactionPattern, v.toString)
         session.sparkContext.setLocalProperty(
           "callSite.short",
-          s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
+          s"Spark Connect - session_id: '${v.getSessionId}'")

Review Comment:
   Unrelated change, this just make the logging cleaner to debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184417644


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {

Review Comment:
   This is soooo subtle, I moved the aggExpr a bit earlier, then everything breaks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184078816


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {

Review Comment:
   This adds `case u: UnresolvedFunction => UnresolvedAlias(expr, None)` mostly. Not sure why this is a special case here.
   
   Regarding  `TypedAggregate` logic, we can move it out around each groupingExprs (a few places, a bit out of the scope of this PR) and aggExprs (~easy~).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184078816


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {

Review Comment:
   This adds `case u: UnresolvedFunction => UnresolvedAlias(expr, None)` mostly. Not sure why this is a special case here.
   
   Regarding  `TypedAggregate` logic, we can move it out around each groupingExprs (a few places, a bit out of the scope of this PR) and aggExprs (easy).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183177570


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -322,41 +471,45 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable
  * [[KeyValueGroupedDataset]] behaves on the server.
  */
 private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
-    private val ds: Dataset[IV],
     private val sparkSession: SparkSession,
     private val plan: proto.Plan,
     private val ikEncoder: AgnosticEncoder[IK],
     private val kEncoder: AgnosticEncoder[K],
-    private val groupingFunc: IV => IK,
-    private val valueMapFunc: IV => V)
+    private val ivEncoder: AgnosticEncoder[IV],
+    private val vEncoder: AgnosticEncoder[V],
+    private val groupingExprs: java.util.List[proto.Expression],
+    private val valueMapFunc: IV => V,
+    private val keysFunc: () => Dataset[IK])

Review Comment:
   I don't think we need this. This should always be `df.select(groupingExprs: _*).as(kEncoder)` or something equivalent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on PR #40796:
URL: https://github.com/apache/spark/pull/40796#issuecomment-1518390479

   Overall looks reasonable to me. I only have questions over the proto validation in the server side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183913165


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {
+    case expr: NamedExpression => expr
+    case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>

Review Comment:
   For the record this logic is also in `Column.generateAlias`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184086351


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named
+    })
+    val keyColumn = logical.UntypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
+    logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
+  }
+
   private def transformAggregate(rel: proto.Aggregate): LogicalPlan = {
+    rel.getGroupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBYKEY =>

Review Comment:
   To mark the input is `DummyUDF + typed groupingExprs` rather than `groupingExprs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1190193381


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -322,41 +471,45 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable
  * [[KeyValueGroupedDataset]] behaves on the server.
  */
 private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
-    private val ds: Dataset[IV],
     private val sparkSession: SparkSession,
     private val plan: proto.Plan,
     private val ikEncoder: AgnosticEncoder[IK],
     private val kEncoder: AgnosticEncoder[K],
-    private val groupingFunc: IV => IK,
-    private val valueMapFunc: IV => V)
+    private val ivEncoder: AgnosticEncoder[IV],
+    private val vEncoder: AgnosticEncoder[V],
+    private val groupingExprs: java.util.List[proto.Expression],
+    private val valueMapFunc: IV => V,
+    private val keysFunc: () => Dataset[IK])

Review Comment:
   I do not see a better way to handle the keyFunc. If I pass it in, then I need to carry the grouping_udf and grouping_expressions in the raw format. Now I can just have a simple groupingExpr: proto.Expression. Happy to fix in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1190203009


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -414,3 +582,48 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
     udf.apply(inputEncoders.map(_ => col("*")): _*).expr.getCommonInlineUserDefinedFunction
   }
 }
+
+private object KeyValueGroupedDatasetImpl {
+  def apply[K, V](
+      ds: Dataset[V],
+      kEncoder: AgnosticEncoder[K],
+      groupingFunc: V => K): KeyValueGroupedDatasetImpl[K, V, K, V] = {
+    val gf = ScalarUserDefinedFunction(
+      function = groupingFunc,
+      inputEncoders = ds.encoder :: Nil, // Using the original value and key encoders
+      outputEncoder = kEncoder)
+    new KeyValueGroupedDatasetImpl(
+      ds.sparkSession,
+      ds.plan,
+      kEncoder,
+      kEncoder,
+      ds.encoder,
+      ds.encoder,
+      Arrays.asList(gf.apply(col("*")).expr),
+      UdfUtils.identical(),
+      () => ds.map(groupingFunc)(kEncoder))
+  }
+
+  def apply[K, V](
+      df: DataFrame,
+      kEncoder: AgnosticEncoder[K],
+      vEncoder: AgnosticEncoder[V],
+      groupingExprs: Seq[Column]): KeyValueGroupedDatasetImpl[K, V, K, V] = {
+    // Use a dummy udf to pass the K V encoders
+    val dummyGroupingFunc = ScalarUserDefinedFunction(
+      function = UdfUtils.noOp[V, K](),

Review Comment:
   If we can we should get rid of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #40796:
URL: https://github.com/apache/spark/pull/40796#issuecomment-1517025604

   cc @amaliujia Could you review the agg and reduce changes. a.k.a. the last two commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183185011


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -322,41 +471,45 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable
  * [[KeyValueGroupedDataset]] behaves on the server.
  */
 private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
-    private val ds: Dataset[IV],
     private val sparkSession: SparkSession,
     private val plan: proto.Plan,
     private val ikEncoder: AgnosticEncoder[IK],
     private val kEncoder: AgnosticEncoder[K],
-    private val groupingFunc: IV => IK,
-    private val valueMapFunc: IV => V)
+    private val ivEncoder: AgnosticEncoder[IV],
+    private val vEncoder: AgnosticEncoder[V],
+    private val groupingExprs: java.util.List[proto.Expression],
+    private val valueMapFunc: IV => V,
+    private val keysFunc: () => Dataset[IK])

Review Comment:
   Ok, I think I understand what you are trying to do here. It makes sense. In terms of structure I would prefer the following. Keep the `Dataset` value around, and define `keyFunc` as a transformation on that dataset, instead of turning it into something separate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183177942


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -414,3 +583,52 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
     udf.apply(inputEncoders.map(_ => col("*")): _*).expr.getCommonInlineUserDefinedFunction
   }
 }
+
+private object KeyValueGroupedDatasetImpl {
+  def apply[K, V](
+      ds: Dataset[V],
+      sparkSession: SparkSession,

Review Comment:
   Dataset has a sparkSession field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183906392


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {

Review Comment:
   I mean this is sort of like the named functionality, but different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183189886


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala:
##########
@@ -95,5 +95,11 @@ private[sql] object UdfUtils extends Serializable {
       }
   }
 
+  def mapReduceFuncToScalaFunc[T](func: ReduceFunction[T]): (T, T) => T = func.call
+
+  def groupAllUnderBoolTrue[T](): T => Boolean = _ => true
+
   def identical[T](): T => T = t => t
+
+  def as[V, K](): V => K = v => v.asInstanceOf[K]

Review Comment:
   This should be a no-op because of type erasure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183190815


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala:
##########
@@ -215,4 +214,54 @@ class KeyValueGroupedDatasetE2ETestSuite extends RemoteSparkSession {
         "4",
         ";9,8"))
   }
+
+  test("agg") {

Review Comment:
   Coverage seems a bit low. I am missing a couple of methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1181870990


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -37,15 +37,15 @@ import org.apache.spark.connect.proto
  */
 class RelationalGroupedDataset private[sql] (
     private[sql] val df: DataFrame,
-    private[sql] val groupingExprs: Seq[proto.Expression],
+    private[sql] val groupingExprs: Seq[Column],

Review Comment:
   What prompted the change to `Column`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202145924


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   ```java
   23/05/23 13:40:31 WARN GenerateUnsafeProjection: code for assertnotnull(input[0, scala.Tuple2, true])._1,assertnotnull(input[0, scala.Tuple2, true])._2:
   /* 001 */ public java.lang.Object generate(Object[] references) {
   /* 002 */   return new SpecificUnsafeProjection(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
   /* 006 */
   /* 007 */   private Object[] references;
   /* 008 */   private boolean subExprIsNull_0;
   /* 009 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
   /* 010 */   private scala.Tuple2[] mutableStateArray_0 = new scala.Tuple2[1];
   /* 011 */
   /* 012 */   public SpecificUnsafeProjection(Object[] references) {
   /* 013 */     this.references = references;
   /* 014 */
   /* 015 */     mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
   /* 016 */
   /* 017 */   }
   /* 018 */
   /* 019 */   public void initialize(int partitionIndex) {
   /* 020 */
   /* 021 */   }
   /* 022 */
   /* 023 */   // Scala.Function1 need this
   /* 024 */   public java.lang.Object apply(java.lang.Object row) {
   /* 025 */     return apply((InternalRow) row);
   /* 026 */   }
   /* 027 */
   /* 028 */   public UnsafeRow apply(InternalRow i) {
   /* 029 */     mutableStateArray_1[0].reset();
   /* 030 */     subExpr_0(i);
   /* 031 */
   /* 032 */     mutableStateArray_1[0].zeroOutNullBytes();
   /* 033 */
   /* 034 */     boolean isNull_2 = true;
   /* 035 */     boolean value_2 = false;
   /* 036 */     isNull_2 = false;
   /* 037 */     if (!isNull_2) {
   /* 038 */
   /* 039 */       Object funcResult_0 = null;
   /* 040 */       funcResult_0 = mutableStateArray_0[0]._1();
   /* 041 */       value_2 = (Boolean) funcResult_0;
   /* 042 */
   /* 043 */     }
   /* 044 */     mutableStateArray_1[0].write(0, value_2);
   /* 045 */
   /* 046 */     boolean isNull_3 = true;
   /* 047 */     long value_3 = -1L;
   /* 048 */     isNull_3 = false;
   /* 049 */     if (!isNull_3) {
   /* 050 */
   /* 051 */       Object funcResult_1 = null;
   /* 052 */       funcResult_1 = mutableStateArray_0[0]._2();
   /* 053 */       value_3 = (Long) funcResult_1;
   /* 054 */
   /* 055 */     }
   /* 056 */     mutableStateArray_1[0].write(1, value_3);
   /* 057 */     return (mutableStateArray_1[0].getRow());
   /* 058 */   }
   /* 059 */
   /* 060 */
   /* 061 */   private void subExpr_0(InternalRow i) {
   /* 062 */     boolean isNull_1 = i.isNullAt(0);
   /* 063 */     scala.Tuple2 value_1 = isNull_1 ?
   /* 064 */     null : ((scala.Tuple2)i.get(0, null));
   /* 065 */     if (isNull_1) {
   /* 066 */       throw new NullPointerException(((java.lang.String) references[0] /* errMsg */));
   /* 067 */     }
   /* 068 */     subExprIsNull_0 = false;
   /* 069 */     mutableStateArray_0[0] = value_1;
   /* 070 */   }
   /* 071 */
   /* 072 */ }
   ```
   NPE should be thrown from line 53 of generated code, and the input data should be `(false, null)` (ReduceAggregator.zero).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on PR #40796:
URL: https://github.com/apache/spark/pull/40796#issuecomment-1544918604

   cc @hvanhovell @HyukjinKwon Can we merge this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1174198988


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -545,34 +540,94 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformTypedMapPartitions(
       fun: proto.CommonInlineUserDefinedFunction,
       child: LogicalPlan): LogicalPlan = {
-    val udf = fun.getScalarScalaUdf
-    val udfPacket =
-      Utils.deserialize[UdfPacket](
-        udf.getPayload.toByteArray,
-        SparkConnectArtifactManager.classLoaderWithArtifacts)
-    assert(udfPacket.inputEncoders.size == 1)
-    val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head)
-    val rEnc = ExpressionEncoder(udfPacket.outputEncoder)
+    val udf = unpackUdf(fun)
+    assert(udf.inputEncoders.size == 1)
+    val iEnc = ExpressionEncoder(udf.inputEncoders.head)
+    val rEnc = ExpressionEncoder(udf.outputEncoder)
 
     val deserializer = UnresolvedDeserializer(iEnc.deserializer)
     val deserialized = DeserializeToObject(deserializer, generateObjAttr(iEnc), child)
     val mapped = MapPartitions(
-      udfPacket.function.asInstanceOf[Iterator[Any] => Iterator[Any]],
+      udf.function.asInstanceOf[Iterator[Any] => Iterator[Any]],
       generateObjAttr(rEnc),
       deserialized)
     SerializeFromObject(rEnc.namedExpressions, mapped)
   }
 
   private def transformGroupMap(rel: proto.GroupMap): LogicalPlan = {
-    val pythonUdf = transformPythonUDF(rel.getFunc)
-    val cols =
-      rel.getGroupingExpressionsList.asScala.toSeq.map(expr => Column(transformExpression(expr)))
+    val commonUdf = rel.getFunc
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedGroupMap(rel, commonUdf)
 
-    Dataset
-      .ofRows(session, transformRelation(rel.getInput))
-      .groupBy(cols: _*)
-      .flatMapGroupsInPandas(pythonUdf)
-      .logicalPlan
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val cols =
+          rel.getGroupingExpressionsList.asScala.toSeq.map(expr =>
+            Column(transformExpression(expr)))
+
+        Dataset
+          .ofRows(session, transformRelation(rel.getInput))
+          .groupBy(cols: _*)
+          .flatMapGroupsInPandas(pythonUdf)
+          .logicalPlan
+
+      case _ =>
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
+    }
+  }
+
+  private def transformTypedGroupMap(
+      rel: GroupMap,
+      commonUdf: CommonInlineUserDefinedFunction): LogicalPlan = {
+    // Compute grouping key
+    val logicalPlan = transformRelation(rel.getInput)
+    val udf = unpackUdf(commonUdf)
+    assert(rel.getGroupingExpressionsCount == 1)
+    val groupFunc = rel.getGroupingExpressionsList.asScala.toSeq
+      .map(expr => unpackUdf(expr.getCommonInlineUserDefinedFunction))
+      .head
+
+    assert(groupFunc.inputEncoders.size == 1)
+    val vEnc = ExpressionEncoder(groupFunc.inputEncoders.head)
+    val kEnc = ExpressionEncoder(groupFunc.outputEncoder)
+    val uEnc = ExpressionEncoder(udf.outputEncoder)
+    assert(udf.inputEncoders.nonEmpty)
+    // ukEnc != kEnc if user has called kvDS.keyAs
+    val ukEnc = ExpressionEncoder(udf.inputEncoders.head)
+
+    val withGroupingKey = new AppendColumns(
+      groupFunc.function.asInstanceOf[Any => Any],
+      vEnc.clsTag.runtimeClass,
+      vEnc.schema,
+      UnresolvedDeserializer(vEnc.deserializer),
+      kEnc.namedExpressions,
+      logicalPlan)
+
+    // Compute sort order
+    val sortExprs =
+      rel.getSortingExpressionsList.asScala.toSeq.map(expr => transformExpression(expr))
+    val sortOrder: Seq[SortOrder] = sortExprs.map {
+      case expr: SortOrder => expr
+      case expr: Expression => SortOrder(expr, Ascending)

Review Comment:
   Who will be in charge of checking non-supported expr? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -545,34 +540,94 @@ class SparkConnectPlanner(val session: SparkSession) {
   private def transformTypedMapPartitions(
       fun: proto.CommonInlineUserDefinedFunction,
       child: LogicalPlan): LogicalPlan = {
-    val udf = fun.getScalarScalaUdf
-    val udfPacket =
-      Utils.deserialize[UdfPacket](
-        udf.getPayload.toByteArray,
-        SparkConnectArtifactManager.classLoaderWithArtifacts)
-    assert(udfPacket.inputEncoders.size == 1)
-    val iEnc = ExpressionEncoder(udfPacket.inputEncoders.head)
-    val rEnc = ExpressionEncoder(udfPacket.outputEncoder)
+    val udf = unpackUdf(fun)
+    assert(udf.inputEncoders.size == 1)
+    val iEnc = ExpressionEncoder(udf.inputEncoders.head)
+    val rEnc = ExpressionEncoder(udf.outputEncoder)
 
     val deserializer = UnresolvedDeserializer(iEnc.deserializer)
     val deserialized = DeserializeToObject(deserializer, generateObjAttr(iEnc), child)
     val mapped = MapPartitions(
-      udfPacket.function.asInstanceOf[Iterator[Any] => Iterator[Any]],
+      udf.function.asInstanceOf[Iterator[Any] => Iterator[Any]],
       generateObjAttr(rEnc),
       deserialized)
     SerializeFromObject(rEnc.namedExpressions, mapped)
   }
 
   private def transformGroupMap(rel: proto.GroupMap): LogicalPlan = {
-    val pythonUdf = transformPythonUDF(rel.getFunc)
-    val cols =
-      rel.getGroupingExpressionsList.asScala.toSeq.map(expr => Column(transformExpression(expr)))
+    val commonUdf = rel.getFunc
+    commonUdf.getFunctionCase match {
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.SCALAR_SCALA_UDF =>
+        transformTypedGroupMap(rel, commonUdf)
 
-    Dataset
-      .ofRows(session, transformRelation(rel.getInput))
-      .groupBy(cols: _*)
-      .flatMapGroupsInPandas(pythonUdf)
-      .logicalPlan
+      case proto.CommonInlineUserDefinedFunction.FunctionCase.PYTHON_UDF =>
+        val pythonUdf = transformPythonUDF(commonUdf)
+        val cols =
+          rel.getGroupingExpressionsList.asScala.toSeq.map(expr =>
+            Column(transformExpression(expr)))
+
+        Dataset
+          .ofRows(session, transformRelation(rel.getInput))
+          .groupBy(cols: _*)
+          .flatMapGroupsInPandas(pythonUdf)
+          .logicalPlan
+
+      case _ =>
+        throw InvalidPlanInput(
+          s"Function with ID: ${commonUdf.getFunctionCase.getNumber} is not supported")
+    }
+  }
+
+  private def transformTypedGroupMap(
+      rel: GroupMap,
+      commonUdf: CommonInlineUserDefinedFunction): LogicalPlan = {
+    // Compute grouping key
+    val logicalPlan = transformRelation(rel.getInput)
+    val udf = unpackUdf(commonUdf)
+    assert(rel.getGroupingExpressionsCount == 1)
+    val groupFunc = rel.getGroupingExpressionsList.asScala.toSeq
+      .map(expr => unpackUdf(expr.getCommonInlineUserDefinedFunction))
+      .head
+
+    assert(groupFunc.inputEncoders.size == 1)
+    val vEnc = ExpressionEncoder(groupFunc.inputEncoders.head)
+    val kEnc = ExpressionEncoder(groupFunc.outputEncoder)
+    val uEnc = ExpressionEncoder(udf.outputEncoder)
+    assert(udf.inputEncoders.nonEmpty)
+    // ukEnc != kEnc if user has called kvDS.keyAs
+    val ukEnc = ExpressionEncoder(udf.inputEncoders.head)
+
+    val withGroupingKey = new AppendColumns(
+      groupFunc.function.asInstanceOf[Any => Any],
+      vEnc.clsTag.runtimeClass,
+      vEnc.schema,
+      UnresolvedDeserializer(vEnc.deserializer),
+      kEnc.namedExpressions,
+      logicalPlan)
+
+    // Compute sort order
+    val sortExprs =
+      rel.getSortingExpressionsList.asScala.toSeq.map(expr => transformExpression(expr))
+    val sortOrder: Seq[SortOrder] = sortExprs.map {
+      case expr: SortOrder => expr
+      case expr: Expression => SortOrder(expr, Ascending)

Review Comment:
   Who will in charge of checking non-supported expr? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as
URL: https://github.com/apache/spark/pull/40796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183162754


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] (
     val colNames: Seq[String] = col1 +: cols
     new RelationalGroupedDataset(
       toDF(),
-      colNames.map(colName => Column(colName).expr),
+      colNames.map(colName => Column(colName)),
       proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
+  /**
+   * (Scala-specific) Reduces the elements of this Dataset using the specified binary function.
+   * The given `func` must be commutative and associative or the result may be non-deterministic.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def reduce(func: (T, T) => T): T = {
+    val list = this
+      .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder)

Review Comment:
   The current code path is rdd ops. It does not go via ReduceAggregator.
   
   If we want to go via ReduceAggregator, we need to make the agg to support `RelationalGroupedDataset#agg(TypedColumn)`, which is missing today. That code path will actually ignore Dataset[T] and always use `RowEncoder` instead. Following this logic, the only problem left is how could we tell the TypedCol apart from Col as no more extra info needed to pass to the server?
   
   Suggest leave it as a TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183888685


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala:
##########
@@ -703,3 +703,20 @@ case class CoGroup(
   override protected def withNewChildrenInternal(
       newLeft: LogicalPlan, newRight: LogicalPlan): CoGroup = copy(left = newLeft, right = newRight)
 }
+
+private[sql] object UntypedAggUtils {

Review Comment:
   I am not sure this in right location... It is kind of unrelated to the actual expression. Maybe move it to `org.apache.spark.sql.internal` or something like that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183178453


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] (
     val colNames: Seq[String] = col1 +: cols
     new RelationalGroupedDataset(
       toDF(),
-      colNames.map(colName => Column(colName).expr),
+      colNames.map(colName => Column(colName)),
       proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
+  /**
+   * (Scala-specific) Reduces the elements of this Dataset using the specified binary function.
+   * The given `func` must be commutative and associative or the result may be non-deterministic.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def reduce(func: (T, T) => T): T = {
+    val list = this
+      .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder)

Review Comment:
   SGTM please file a ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184408288


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -664,7 +665,53 @@ class SparkConnectPlanner(val session: SparkSession) {
         input: proto.Relation,
         groupingExprs: java.util.List[proto.Expression],
         sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
-      val logicalPlan = transformRelation(input)
+      apply(transformRelation(input), groupingExprs, sortingExprs)
+    }
+
+    private def apply(
+        logicalPlan: LogicalPlan,
+        groupingExprs: java.util.List[proto.Expression],
+        sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
+      if (groupingExprs.size() == 1) {
+        createFromGroupByKeyFunc(logicalPlan, groupingExprs, sortingExprs)
+      } else if (groupingExprs.size() > 1) {

Review Comment:
   I do not see a common path here. The nasty part of this is we hide a logic inside the grouping_exprs using the count of the expressions. The alternative I can think is a UnresolvedFunc or new Expression which allow us to add more logics. e.g.
   ```
   message KeyValueGroupedDataset { // New Expression or Unresolved Func
    // (Required) Input user-defined function. Defines the grouping func
    CommonInlineUserDefinedFunction grouping_func = 1;
   
    // (Optional) Extra grouping expressions needed for RelationalGroupedDataset
    repeat Expression grouping_expressions = 2;
   }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183902935


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named
+    })
+    val keyColumn = logical.UntypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
+    logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
+  }
+
   private def transformAggregate(rel: proto.Aggregate): LogicalPlan = {
+    rel.getGroupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBYKEY =>

Review Comment:
   Why do we need this special case?



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named
+    })
+    val keyColumn = logical.UntypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
+    logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
+  }
+
   private def transformAggregate(rel: proto.Aggregate): LogicalPlan = {
+    rel.getGroupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBYKEY =>

Review Comment:
   Why do we need this special case? Only for naming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #40796:
URL: https://github.com/apache/spark/pull/40796#issuecomment-1548042425

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183186633


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] (
     val colNames: Seq[String] = col1 +: cols
     new RelationalGroupedDataset(
       toDF(),
-      colNames.map(colName => Column(colName).expr),
+      colNames.map(colName => Column(colName)),
       proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
+  /**
+   * (Scala-specific) Reduces the elements of this Dataset using the specified binary function.
+   * The given `func` must be commutative and associative or the result may be non-deterministic.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def reduce(func: (T, T) => T): T = {
+    val list = this
+      .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder)

Review Comment:
   How about `df.groupBy().as[Unit, T].reduceGroups(func).as[T].head`? That should stop us from submitting an aggregate with a group.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183188030


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -414,3 +583,52 @@ private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
     udf.apply(inputEncoders.map(_ => col("*")): _*).expr.getCommonInlineUserDefinedFunction
   }
 }
+
+private object KeyValueGroupedDatasetImpl {
+  def apply[K, V](
+      ds: Dataset[V],
+      sparkSession: SparkSession,
+      plan: Plan,

Review Comment:
   Dataset has a plan field. TBH i think you should just hold on to the Dataset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183900469


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named

Review Comment:
   Can we just put the named functionality in a separate util function instead of creating a Typed column here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183914885


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {

Review Comment:
   No, I can only think of using `UnresolvedFunc` to carry both `mapValues` + `aggExprs`. Any better ideas are most welcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183117935


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -37,15 +37,15 @@ import org.apache.spark.connect.proto
  */
 class RelationalGroupedDataset private[sql] (
     private[sql] val df: DataFrame,
-    private[sql] val groupingExprs: Seq[proto.Expression],
+    private[sql] val groupingExprs: Seq[Column],

Review Comment:
   To simply the `keysFunc` when creating `KeyValueGroupedDatasetImpl` ([code](https://github.com/apache/spark/pull/40796/files#diff-91fc5eb4a3a5099e53b0e14b27b4cfc427c2df98612844f214a5c778b36d797fR632)):
   ```
       new KeyValueGroupedDatasetImpl(
         sparkSession,
         plan,
         kEncoder,
         kEncoder,
         vEncoder,
         vEncoder,
         (Seq(dummyGroupingFunc) ++ groupingExprs).map(_.expr).asJava,
         UdfUtils.identical(),
         () => df.select(groupingExprs: _*).as(kEncoder)) // To avoid creating new cols again.
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183946902


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -664,7 +665,53 @@ class SparkConnectPlanner(val session: SparkSession) {
         input: proto.Relation,
         groupingExprs: java.util.List[proto.Expression],
         sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
-      val logicalPlan = transformRelation(input)
+      apply(transformRelation(input), groupingExprs, sortingExprs)
+    }
+
+    private def apply(
+        logicalPlan: LogicalPlan,
+        groupingExprs: java.util.List[proto.Expression],
+        sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
+      if (groupingExprs.size() == 1) {
+        createFromGroupByKeyFunc(logicalPlan, groupingExprs, sortingExprs)
+      } else if (groupingExprs.size() > 1) {

Review Comment:
   This feels a bit dirty. I understand that the handling the grouping function seems a bit different, but I wonder if we need the special single function result flattening behavior in the grand scheme of things, especially when you consider we are going to end up with a tupled dataset anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1190383894


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1917,6 +1991,37 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  private def transformTypedReduceExpression(
+      fun: proto.Expression.UnresolvedFunction,
+      dataAttributes: Seq[Attribute]): Expression = {
+    assert(fun.getFunctionName == "reduce")
+    if (fun.getArgumentsCount != 1) {
+      throw InvalidPlanInput("reduce requires single child expression")
+    }
+    val udf = fun.getArgumentsList.asScala.toSeq.head match {
+      case expr
+          if expr.hasCommonInlineUserDefinedFunction
+            && expr.getCommonInlineUserDefinedFunction.hasScalarScalaUdf =>
+        TypedScalaUdf(expr.getCommonInlineUserDefinedFunction)
+      case other =>
+        throw InvalidPlanInput(s"reduce should be a scalar scala udf, but got $other")

Review Comment:
   Does it really have to be a typed scala udf? I mean it just needs to a 2-ary UDF.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1190244437


##########
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##########
@@ -700,6 +684,34 @@ private[sql] object RelationalGroupedDataset {
     new RelationalGroupedDataset(df, groupingExprs, groupType: GroupType)
   }
 
+  private[sql] def handleGroupingExpression(
+      logicalPlan: LogicalPlan,
+      sparkSession: SparkSession,
+      groupingExprs: Seq[Expression]): (QueryExecution, Seq[Attribute]) = {
+    // Resolves grouping expressions.
+    val dummyPlan = Project(groupingExprs.map(alias), LocalRelation(logicalPlan.output))
+    val analyzedPlan = sparkSession.sessionState.analyzer.execute(dummyPlan)
+      .asInstanceOf[Project]
+    sparkSession.sessionState.analyzer.checkAnalysis(analyzedPlan)
+    val aliasedGroupings = analyzedPlan.projectList
+
+    // Adds the grouping expressions that are not in base DataFrame into outputs.
+    val addedCols = aliasedGroupings.filter(g => !logicalPlan.outputSet.contains(g.toAttribute))
+    val qe = Dataset.ofRows(
+      sparkSession,
+      Project(logicalPlan.output ++ addedCols, logicalPlan)).queryExecution
+
+    (qe, aliasedGroupings.map(_.toAttribute))
+  }
+
+  private def alias(expr: Expression): NamedExpression = expr match {

Review Comment:
   We should follow up on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1190186510


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -322,41 +471,45 @@ abstract class KeyValueGroupedDataset[K, V] private[sql] () extends Serializable
  * [[KeyValueGroupedDataset]] behaves on the server.
  */
 private class KeyValueGroupedDatasetImpl[K, V, IK, IV](
-    private val ds: Dataset[IV],
     private val sparkSession: SparkSession,
     private val plan: proto.Plan,
     private val ikEncoder: AgnosticEncoder[IK],
     private val kEncoder: AgnosticEncoder[K],
-    private val groupingFunc: IV => IK,
-    private val valueMapFunc: IV => V)
+    private val ivEncoder: AgnosticEncoder[IV],
+    private val vEncoder: AgnosticEncoder[V],
+    private val groupingExprs: java.util.List[proto.Expression],
+    private val valueMapFunc: IV => V,
+    private val keysFunc: () => Dataset[IK])

Review Comment:
   @zhenlineo ^^^ what about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "LuciferYang (via GitHub)" <gi...@apache.org>.
LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202132810


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   @zhenlineo @hvanhovell @amaliujia I found an interesting thing about these two new cases, `SimpleSparkConnectService` will be submitted as `local [*]`, and when the number of cores on the machine running the case is greater than 10, these two cases will failed as follows:
   
   ```
   Warning: Unable to serialize throwable of type io.grpc.StatusRuntimeException for TestFailed(Ordinal(0, 271),INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...,UserDefinedFunctionE2ETestSuite,org.apache.spark.sql.UserDefinedFunctionE2ETestSuite,Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),Dataset reduce,Dataset reduce,Vector(),Vector(),Some(io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...),Some(290),Some(IndentedText(- Dataset reduce,Dataset reduce,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),None,pool-1-thread-1-ScalaTest-running-UserDefinedFunctionE2ETestSuite,1684472246665), setting it as NotSerializableWrapperException.
   [info] - Dataset reduce *** FAILED *** (290 milliseconds)
   [info]   io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   [info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   [info] 	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   [info] 	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   [info] 	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   [info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   [info] 	at org.apache.spark.scheduler.Task.run(Task.sca...
   [info]   at io.grpc.Status.asRuntimeException(Status.java:535)
   [info]   at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:62)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:114)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:131)
   [info]   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2744)
   [info]   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3184)
   [info]   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2743)
   [info]   at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1292)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$34(UserDefinedFunctionE2ETestSuite.scala:212)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   
   I make a refactor work for these 2 case as https://github.com/apache/spark/pull/41227/files to make the problem reproducible by GA
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhenlineo commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "zhenlineo (via GitHub)" <gi...@apache.org>.
zhenlineo commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1184086351


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {
+      // Use any encoder as a placeholder to perform the TypedColumn#withInputType transformation
+      val any = ds.vEncoder
+      val newExpr = new TypedColumn(transformExpression(expr), any)
+        .withInputType(ds.vEncoder, ds.dataAttributes)
+        .expr
+      Column(newExpr).named
+    })
+    val keyColumn = logical.UntypedAggUtils.aggKeyColumn(ds.kEncoder, ds.groupingAttributes)
+    logical.Aggregate(ds.groupingAttributes, keyColumn +: namedColumns, ds.analyzed)
+  }
+
   private def transformAggregate(rel: proto.Aggregate): LogicalPlan = {
+    rel.getGroupType match {
+      case proto.Aggregate.GroupType.GROUP_TYPE_GROUPBYKEY =>

Review Comment:
   This is the only way to mark KVGDS#agg from RGDS#agg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183916217


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -664,7 +665,53 @@ class SparkConnectPlanner(val session: SparkSession) {
         input: proto.Relation,
         groupingExprs: java.util.List[proto.Expression],
         sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
-      val logicalPlan = transformRelation(input)
+      apply(transformRelation(input), groupingExprs, sortingExprs)
+    }
+
+    private def apply(
+        logicalPlan: LogicalPlan,
+        groupingExprs: java.util.List[proto.Expression],
+        sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
+      if (groupingExprs.size() == 1) {
+        createFromGroupByKeyFunc(logicalPlan, groupingExprs, sortingExprs)
+      } else if (groupingExprs.size() > 1) {
+        createFromRelationalDataset(logicalPlan, groupingExprs, sortingExprs)
+      } else {
+        throw InvalidPlanInput(
+          "The grouping expression cannot be absent for KeyValueGroupedDataset")
+      }
+    }
+
+    private def createFromRelationalDataset(
+        logicalPlan: LogicalPlan,
+        groupingExprs: java.util.List[proto.Expression],
+        sortingExprs: java.util.List[proto.Expression]): UntypedKeyValueGroupedDataset = {
+      assert(groupingExprs.size() >= 1)
+      val dummyFunc = unpackUdf(groupingExprs.get(0).getCommonInlineUserDefinedFunction)
+      val groupExprs = groupingExprs.asScala.toSeq.drop(1).map(expr => transformExpression(expr))
+
+      val vEnc = ExpressionEncoder(dummyFunc.inputEncoders.head)
+      val kEnc = ExpressionEncoder(dummyFunc.outputEncoder)
+
+      val (qe, aliasedGroupings) =

Review Comment:
   As a general guideline, the more we can avoid intermediate Dataset/QueryExecution objects the better. The `analyzer` should take care of most of the work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1183901421


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1867,7 +1928,34 @@ class SparkConnectPlanner(val session: SparkSession) {
     output.logicalPlan
   }
 
+  def transformKeyValueGroupedAggregate(rel: proto.Aggregate): LogicalPlan = {
+    val ds = UntypedKeyValueGroupedDataset(
+      rel.getInput,
+      rel.getGroupingExpressionsList,
+      java.util.Collections.emptyList())
+
+    val namedColumns = rel.getAggregateExpressionsList.asScala.toSeq.map(expr => {

Review Comment:
   This currently does not work with mapValues right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1181870460


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -1271,10 +1268,35 @@ class Dataset[T] private[sql] (
     val colNames: Seq[String] = col1 +: cols
     new RelationalGroupedDataset(
       toDF(),
-      colNames.map(colName => Column(colName).expr),
+      colNames.map(colName => Column(colName)),
       proto.Aggregate.GroupType.GROUP_TYPE_GROUPBY)
   }
 
+  /**
+   * (Scala-specific) Reduces the elements of this Dataset using the specified binary function.
+   * The given `func` must be commutative and associative or the result may be non-deterministic.
+   *
+   * @group action
+   * @since 3.5.0
+   */
+  def reduce(func: (T, T) => T): T = {
+    val list = this
+      .groupByKey(UdfUtils.groupAllUnderBoolTrue())(PrimitiveBooleanEncoder)

Review Comment:
   Not sure if this is a stellar idea, the problem is that some aggregation implementations are optimized for keyless aggregation. By adding a dummy key (which is very hard to detect due to the use of a udf), we won't be able to use these code paths. Can we try to use the ReduceAggregator directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org