You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/09/23 11:23:08 UTC
[spark] branch branch-3.3 updated: [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new fb42c3ecd73 [SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty
fb42c3ecd73 is described below
commit fb42c3ecd7395afeb871a9d782d3844eda7f44f4
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Sep 23 14:22:09 2022 +0300
[SPARK-40535][SQL] Fix bug the buffer of AggregatingAccumulator will not be created if the input rows is empty
### What changes were proposed in this pull request?
When `AggregatingAccumulator` serialize aggregate buffer, may throwing NPE.
There is one test case could repeat this error.
```
val namedObservation = Observation("named")
val df = spark.range(1, 10, 1, 10)
val observed_df = df.observe(
namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))
observed_df.collect()
```
throws exception as follows:
```
13:45:10.976 ERROR org.apache.spark.util.Utils: Exception encountered
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:641)
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.getBufferObject(interfaces.scala:602)
at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.serializeAggregateBufferInPlace(interfaces.scala:624)
at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:205)
at org.apache.spark.sql.execution.AggregatingAccumulator.withBufferSerialized(AggregatingAccumulator.scala:33)
at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:186)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1245)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1456)
at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:663)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Why are the changes needed?
Fix a bug.
After my investigation, The root cause is the buffer of `AggregatingAccumulator` will not be created if the input rows is empty.
### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the correct results.
### How was this patch tested?
New test case.
Closes #37977 from beliefer/SPARK-37203_followup.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
(cherry picked from commit 7bbd975f165ec73c17e4604050f0828e3e5b9c0e)
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../apache/spark/sql/execution/AggregatingAccumulator.scala | 12 +++++++-----
.../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 ++
2 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
index d528e9114ba..667d1a67b39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
@@ -199,11 +199,13 @@ class AggregatingAccumulator private(
override def withBufferSerialized(): AggregatingAccumulator = {
assert(!isAtDriverSide)
- var i = 0
- // AggregatingAccumulator runs on executor, we should serialize all TypedImperativeAggregate.
- while (i < typedImperatives.length) {
- typedImperatives(i).serializeAggregateBufferInPlace(buffer)
- i += 1
+ if (buffer != null) {
+ var i = 0
+ // AggregatingAccumulator runs on executor, we should serialize all TypedImperativeAggregate.
+ while (i < typedImperatives.length) {
+ typedImperatives(i).serializeAggregateBufferInPlace(buffer)
+ i += 1
+ }
}
this
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index ed4443542e5..c65ae966ef6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -769,6 +769,8 @@ class DatasetSuite extends QueryTest
observe(spark.range(100), Map("percentile_approx_val" -> 49))
observe(spark.range(0), Map("percentile_approx_val" -> null))
+ observe(spark.range(1, 10), Map("percentile_approx_val" -> 5))
+ observe(spark.range(1, 10, 1, 11), Map("percentile_approx_val" -> 5))
}
test("sample with replacement") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org