You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/27 12:07:55 UTC
[spark] branch branch-3.0 updated: [SPARK-31485][CORE][3.0] Avoid
application hang if only partial barrier tasks launched
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 60e6983 [SPARK-31485][CORE][3.0] Avoid application hang if only partial barrier tasks launched
60e6983 is described below
commit 60e69834d3cf32531d81d6583e689bd1b4c252fb
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon Apr 27 12:05:53 2020 +0000
[SPARK-31485][CORE][3.0] Avoid application hang if only partial barrier tasks launched
### What changes were proposed in this pull request?
Use `TaskSetManager. abort ` to abort a barrier stage instead of throwing exception within `resourceOffers`.
### Why are the changes needed?
Any non fatal exception thrown within Spark RPC framework can be swallowed:
https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211
The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.
As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued. But since the barrier stage isn't really executed, the application will hang.
The issue can be reproduced by the following test:
```scala
initLocalClusterSparkContext(2)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
iter
}.collect()
```
### Does this PR introduce any user-facing change?
Yes, application hang previously but fail-fast after this fix.
### How was this patch tested?
Added a regression test.
Closes #28356 from Ngone51/bp-spark-31485.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 34 +++++++++++++++-------
.../spark/scheduler/BarrierTaskContextSuite.scala | 20 +++++++++++--
2 files changed, 42 insertions(+), 12 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ed30473..97125f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -57,6 +57,11 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
* * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
* scheduling
* * task-result-getter threads
+ *
+ * CAUTION: Any non fatal exception thrown within Spark RPC framework can be swallowed.
+ * Thus, throwing exception in methods like resourceOffers, statusUpdate won't fail
+ * the application, but could lead to undefined behavior. Instead, we shall use method like
+ * TaskSetManger.abort() to abort a stage and then fail the application (SPARK-31485).
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
@@ -356,9 +361,7 @@ private[spark] class TaskSchedulerImpl(
// addresses are the same as that we allocated in taskSet.resourceOffer() since it's
// synchronized. We don't remove the exact addresses allocated because the current
// approach produces the identical result with less time complexity.
- availableResources(i).getOrElse(rName,
- throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
- .remove(0, rInfo.addresses.size)
+ availableResources(i)(rName).remove(0, rInfo.addresses.size)
}
// Only update hosts for a barrier task.
if (taskSet.isBarrier) {
@@ -516,11 +519,18 @@ private[spark] class TaskSchedulerImpl(
// Check whether the barrier tasks are partially launched.
// TODO SPARK-24818 handle the assert failure case (that can happen when some locality
// requirements are not fulfilled, and we should revert the launched tasks).
- require(addressesWithDescs.size == taskSet.numTasks,
- s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
- s"because only ${addressesWithDescs.size} out of a total number of " +
- s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
- "been blacklisted or cannot fulfill task locality requirements.")
+ if (addressesWithDescs.size != taskSet.numTasks) {
+ val errorMsg =
+ s"Fail resource offers for barrier stage ${taskSet.stageId} because only " +
+ s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" +
+ s" tasks got resource offers. This happens because barrier execution currently " +
+ s"does not work gracefully with delay scheduling. We highly recommend you to " +
+ s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " +
+ s"you see this error frequently."
+ logWarning(errorMsg)
+ taskSet.abort(errorMsg)
+ throw new SparkException(errorMsg)
+ }
// materialize the barrier coordinator.
maybeInitBarrierCoordinator()
@@ -582,8 +592,12 @@ private[spark] class TaskSchedulerImpl(
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
- val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
- "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
+ val execId = taskIdToExecutorId.getOrElse(tid, {
+ val errorMsg =
+ "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
+ taskSet.abort(errorMsg)
+ throw new SparkException(errorMsg)
+ })
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index bcf1fe2..2242d28 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
- def initLocalClusterSparkContext(): Unit = {
+ def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
val conf = new SparkConf()
// Init local cluster here so each barrier task runs in a separated process, thus `barrier()`
// call is actually useful.
- .setMaster("local-cluster[4, 1, 1024]")
+ .setMaster(s"local-cluster[$numWorker, 1, 1024]")
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
@@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
initLocalClusterSparkContext()
testBarrierTaskKilled(interruptOnKill = true)
}
+
+ test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
+ initLocalClusterSparkContext(2)
+ val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
+ val dep = new OneToOneDependency[Int](rdd0)
+ // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
+ // scheduling. So, one of tasks won't be scheduled in one round of resource offer.
+ val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
+ val errorMsg = intercept[SparkException] {
+ rdd.barrier().mapPartitions { iter =>
+ BarrierTaskContext.get().barrier()
+ iter
+ }.collect()
+ }.getMessage
+ assert(errorMsg.contains("Fail resource offers for barrier stage"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org