You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "LuciferYang (via GitHub)" <gi...@apache.org> on 2023/05/23 11:32:16 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #40796: [SPARK-43223][Connect] Typed agg, reduce functions, RelationalGroupedDataset#as

LuciferYang commented on code in PR #40796:
URL: https://github.com/apache/spark/pull/40796#discussion_r1202132810


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala:
##########
@@ -197,4 +197,22 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession {
     spark.range(10).repartition(1).foreachPartition(func)
     assert(sum.get() == 0) // The value is not 45
   }
+
+  test("Dataset reduce") {

Review Comment:
   @zhenlineo @hvanhovell @amaliujia I found an interesting thing about these two new cases, 'SimpleSparkConnectService' will be submitted as `local [*]`, and when the number of cores on the machine running the case is greater than 10, these two cases will failed as follows:
   
   ```
   Warning: Unable to serialize throwable of type io.grpc.StatusRuntimeException for TestFailed(Ordinal(0, 271),INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...,UserDefinedFunctionE2ETestSuite,org.apache.spark.sql.UserDefinedFunctionE2ETestSuite,Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),Dataset reduce,Dataset reduce,Vector(),Vector(),Some(io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   	at org.apache.spark.scheduler.Task.run(Task.sca...),Some(290),Some(IndentedText(- Dataset reduce,Dataset reduce,0)),Some(SeeStackDepthException),Some(org.apache.spark.sql.UserDefinedFunctionE2ETestSuite),None,pool-1-thread-1-ScalaTest-running-UserDefinedFunctionE2ETestSuite,1684472246665), setting it as NotSerializableWrapperException.
   [info] - Dataset reduce *** FAILED *** (290 milliseconds)
   [info]   io.grpc.StatusRuntimeException: INTERNAL: Job aborted due to stage failure: Task 0 in stage 150.0 failed 1 times, most recent failure: Lost task 0.0 in stage 150.0 (TID 316) (localhost executor driver): java.lang.NullPointerException
   [info] 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:53)
   [info] 	at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.serialize(TypedAggregateExpression.scala:267)
   [info] 	at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:620)
   [info] 	at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateResultProjection$6(AggregationIterator.scala:280)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.outputForEmptyGroupingKeyWithoutInput(ObjectAggregationIterator.scala:107)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:117)
   [info] 	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
   [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
   [info] 	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
   [info] 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
   [info] 	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
   [info] 	at org.apache.spark.scheduler.Task.run(Task.sca...
   [info]   at io.grpc.Status.asRuntimeException(Status.java:535)
   [info]   at io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:62)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:114)
   [info]   at org.apache.spark.sql.connect.client.SparkResult.toArray(SparkResult.scala:131)
   [info]   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2744)
   [info]   at org.apache.spark.sql.Dataset.withResult(Dataset.scala:3184)
   [info]   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2743)
   [info]   at org.apache.spark.sql.Dataset.reduce(Dataset.scala:1292)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.$anonfun$new$34(UserDefinedFunctionE2ETestSuite.scala:212)
   [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
   [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   [info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
   [info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
   [info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
   [info]   at scala.collection.immutable.List.foreach(List.scala:431)
   [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
   [info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
   [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
   [info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.Suite.run(Suite.scala:1114)
   [info]   at org.scalatest.Suite.run$(Suite.scala:1096)
   [info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
   [info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
   [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
   [info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
   [info]   at org.apache.spark.sql.UserDefinedFunctionE2ETestSuite.run(UserDefinedFunctionE2ETestSuite.scala:35)
   [info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
   [info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
   [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   [info]   at java.lang.Thread.run(Thread.java:750)
   ```
   
   I make a refactor work for these 2 case as https://github.com/apache/spark/pull/41227/files to make the problem reproducible by GA
   
   
   
    
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org