You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/09/23 11:23:08 UTC

[spark] branch branch-3.3 updated: [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new fb42c3ecd73 [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty
fb42c3ecd73 is described below

commit fb42c3ecd7395afeb871a9d782d3844eda7f44f4
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Sep 23 14:22:09 2022 +0300

    [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty
    
    ### What changes were proposed in this pull request?
    When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
    There is one test case could repeat this error.
    ```
    val namedObservation = Observation("named")
    val df = spark.range(1, 10, 1, 10)
    val observed_df = df.observe(
      namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
    observed_df.collect()
    ```
    throws exception as follows:
    ```
    13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
    java.lang.NullPointerException
            at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
            at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
            at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
            at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
            at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
            at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
            at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
            at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
            at scala.collection.Iterator.foreach(Iterator.scala:943)
            at scala.collection.Iterator.foreach$(Iterator.scala:943)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
            at scala.collection.IterableLike.foreach(IterableLike.scala:74)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
            at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
            at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
            at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
            at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
            at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Why are the changes needed?
    Fix a bug.
    After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Users will see the correct results.
    
    ### How was this patch tested?
    New test case.
    
    Closes #37977 from beliefer/SPARK-37203_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
    (cherry picked from commit 7bbd975f165ec73c17e4604050f0828e3e5b9c0e)
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../apache/spark/sql/execution/AggregatingAccumulator.scala  | 12 +++++++-----
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala   |  2 ++
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
index d528e9114ba..667d1a67b39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
@@ -199,11 +199,13 @@ class AggregatingAccumulator private(
 
   override def withBufferSerialized(): AggregatingAccumulator = {
     assert(!isAtDriverSide)
-    var i = 0
-    // AggregatingAccumulator runs on executor, we should serialize all TypedImperativeAggregate.
-    while (i < typedImperatives.length) {
-      typedImperatives(i).serializeAggregateBufferInPlace(buffer)
-      i += 1
+    if (buffer != null) {
+      var i = 0
+      // AggregatingAccumulator runs on executor, we should serialize all TypedImperativeAggregate.
+      while (i < typedImperatives.length) {
+        typedImperatives(i).serializeAggregateBufferInPlace(buffer)
+        i += 1
+      }
     }
     this
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index ed4443542e5..c65ae966ef6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -769,6 +769,8 @@ class DatasetSuite extends QueryTest
 
     observe(spark.range(100), Map("percentile_approx_val" -> 49))
     observe(spark.range(0), Map("percentile_approx_val" -> null))
+    observe(spark.range(1, 10), Map("percentile_approx_val" -> 5))
+    observe(spark.range(1, 10, 1, 11), Map("percentile_approx_val" -> 5))
   }
 
   test("sample with replacement") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org