You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/06/29 23:09:44 UTC
[spark] branch branch-3.3 updated: [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new d3f7f42b278 [SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13
d3f7f42b278 is described below
commit d3f7f42b2780416b2cf5cb50e522909bb68e8c56
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Jun 29 18:09:10 2022 -0500
[SPARK-39553][CORE] Multi-thread unregister shuffle shouldn't throw NPE when using Scala 2.13
### What changes were proposed in this pull request?
This pr add a `shuffleStatus != null` condition to `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE when using Scala 2.13.
### Why are the changes needed?
Ensure that no NPE is thrown when `o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, this pr is only for Scala 2.13.
`o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called concurrently by the following two paths:
- BlockManagerStorageEndpoint:
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62
- ContextCleaner:
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241
When test with Scala 2.13, for example `sql/core` module, there are many log as follows,although these did not cause UTs failure:
```
17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to remove shuffle 87 - null
java.lang.NullPointerException
at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
at scala.Option.foreach(Option.scala:437)
at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 94
java.lang.NullPointerException
at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
at org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
at scala.Option.foreach(Option.scala:437)
at org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
at org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241)
at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202)
at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
at scala.Option.foreach(Option.scala:437)
at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79)
```
I think this is a bug of Scala 2.13.8 and already submit an issue to https://github.com/scala/bug/issues/12613, this PR is only for protection, we should remove this protection after Scala 2.13(maybe https://github.com/scala/scala/pull/9957) fixes this issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass GA
- Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE` to `MapOutputTrackerSuite`, we can test manually as follows:
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl core -am -Pscala-2.13
mvn clean test -pl core -Pscala-2.13 -Dtest=none -DwildcardSuites=org.apache.spark.MapOutputTrackerSuite
```
**Before**
```
- SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** FAILED ***
3 did not equal 0 (MapOutputTrackerSuite.scala:971)
Run completed in 17 seconds, 505 milliseconds.
Total number of tests run: 25
Suites: completed 2, aborted 0
Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0
*** 1 TEST FAILED ***
```
**After**
```
- SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE
Run completed in 17 seconds, 996 milliseconds.
Total number of tests run: 25
Suites: completed 2, aborted 0
Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0
All tests passed.
```
Closes #37024 from LuciferYang/SPARK-39553.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
(cherry picked from commit 29258964cae45cea43617ade971fb4ea9fe2902a)
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../scala/org/apache/spark/MapOutputTracker.scala | 8 +++--
.../org/apache/spark/MapOutputTrackerSuite.scala | 34 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index e6ed469250b..b1974948430 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -879,8 +879,12 @@ private[spark] class MapOutputTrackerMaster(
/** Unregister shuffle data */
def unregisterShuffle(shuffleId: Int): Unit = {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
- shuffleStatus.invalidateSerializedMapOutputStatusCache()
- shuffleStatus.invalidateSerializedMergeOutputStatusCache()
+ // SPARK-39553: Add protection for Scala 2.13 due to https://github.com/scala/bug/issues/12613
+ // We should revert this if Scala 2.13 solves this issue.
+ if (shuffleStatus != null) {
+ shuffleStatus.invalidateSerializedMapOutputStatusCache()
+ shuffleStatus.invalidateSerializedMergeOutputStatusCache()
+ }
}
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 5e502eb5687..a13527f4b74 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import java.util.concurrent.atomic.LongAdder
+
import scala.collection.mutable.ArrayBuffer
import org.mockito.ArgumentMatchers.any
@@ -938,4 +940,36 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext {
assert(worker.shufflePushMergerLocations.isEmpty)
}
}
+
+ test("SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE") {
+ val rpcEnv = createRpcEnv("test")
+ val tracker = newTrackerMaster()
+ tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+ new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+ val shuffleIdRange = 0 until 100
+ shuffleIdRange.foreach { shuffleId =>
+ tracker.registerShuffle(shuffleId, 2, MergeStatus.SHUFFLE_PUSH_DUMMY_NUM_REDUCES)
+ }
+ val npeCounter = new LongAdder()
+ // More threads will help to reproduce the problem
+ val threads = new Array[Thread](5)
+ threads.indices.foreach { i =>
+ threads(i) = new Thread() {
+ override def run(): Unit = {
+ shuffleIdRange.foreach { shuffleId =>
+ try {
+ tracker.unregisterShuffle(shuffleId)
+ } catch {
+ case _: NullPointerException => npeCounter.increment()
+ }
+ }
+ }
+ }
+ }
+ threads.foreach(_.start())
+ threads.foreach(_.join())
+ tracker.stop()
+ rpcEnv.shutdown()
+ assert(npeCounter.intValue() == 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org