You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/27 12:07:55 UTC

[spark] branch branch-3.0 updated: [SPARK-31485][CORE][3.0] Avoid application hang if only partial barrier tasks launched

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 60e6983  [SPARK-31485][CORE][3.0] Avoid application hang if only partial barrier tasks launched
60e6983 is described below

commit 60e69834d3cf32531d81d6583e689bd1b4c252fb
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon Apr 27 12:05:53 2020 +0000

    [SPARK-31485][CORE][3.0] Avoid application hang if only partial barrier tasks launched
    
    ### What changes were proposed in this pull request?
    
    Use `TaskSetManager. abort ` to abort a barrier stage instead of throwing exception within `resourceOffers`.
    
    ### Why are the changes needed?
    
    Any non fatal exception thrown within Spark RPC framework can be swallowed:
    
    https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211
    
     The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application.
    
     As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued.   But since the barrier stage isn't really executed, the application will hang.
    
    The issue can be reproduced by the following test:
    
    ```scala
    initLocalClusterSparkContext(2)
    val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
    val dep = new OneToOneDependency[Int](rdd0)
    val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0")))
    rdd.barrier().mapPartitions { iter =>
      BarrierTaskContext.get().barrier()
      iter
    }.collect()
    ```
    ### Does this PR introduce any user-facing change?
    
    Yes, application hang previously but fail-fast after this fix.
    
    ### How was this patch tested?
    
    Added a regression test.
    
    Closes #28356 from Ngone51/bp-spark-31485.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 34 +++++++++++++++-------
 .../spark/scheduler/BarrierTaskContextSuite.scala  | 20 +++++++++++--
 2 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ed30473..97125f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -57,6 +57,11 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
  *   * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
  *      scheduling
  *   * task-result-getter threads
+ *
+ * CAUTION: Any non fatal exception thrown within Spark RPC framework can be swallowed.
+ * Thus, throwing exception in methods like resourceOffers, statusUpdate won't fail
+ * the application, but could lead to undefined behavior. Instead, we shall use method like
+ * TaskSetManger.abort() to abort a stage and then fail the application (SPARK-31485).
  */
 private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
@@ -356,9 +361,7 @@ private[spark] class TaskSchedulerImpl(
               // addresses are the same as that we allocated in taskSet.resourceOffer() since it's
               // synchronized. We don't remove the exact addresses allocated because the current
               // approach produces the identical result with less time complexity.
-              availableResources(i).getOrElse(rName,
-                throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
-                .remove(0, rInfo.addresses.size)
+              availableResources(i)(rName).remove(0, rInfo.addresses.size)
             }
             // Only update hosts for a barrier task.
             if (taskSet.isBarrier) {
@@ -516,11 +519,18 @@ private[spark] class TaskSchedulerImpl(
           // Check whether the barrier tasks are partially launched.
           // TODO SPARK-24818 handle the assert failure case (that can happen when some locality
           // requirements are not fulfilled, and we should revert the launched tasks).
-          require(addressesWithDescs.size == taskSet.numTasks,
-            s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +
-              s"because only ${addressesWithDescs.size} out of a total number of " +
-              s"${taskSet.numTasks} tasks got resource offers. The resource offers may have " +
-              "been blacklisted or cannot fulfill task locality requirements.")
+          if (addressesWithDescs.size != taskSet.numTasks) {
+            val errorMsg =
+              s"Fail resource offers for barrier stage ${taskSet.stageId} because only " +
+                s"${addressesWithDescs.size} out of a total number of ${taskSet.numTasks}" +
+                s" tasks got resource offers. This happens because barrier execution currently " +
+                s"does not work gracefully with delay scheduling. We highly recommend you to " +
+                s"disable delay scheduling by setting spark.locality.wait=0 as a workaround if " +
+                s"you see this error frequently."
+            logWarning(errorMsg)
+            taskSet.abort(errorMsg)
+            throw new SparkException(errorMsg)
+          }
 
           // materialize the barrier coordinator.
           maybeInitBarrierCoordinator()
@@ -582,8 +592,12 @@ private[spark] class TaskSchedulerImpl(
             if (state == TaskState.LOST) {
               // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
               // where each executor corresponds to a single task, so mark the executor as failed.
-              val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
-                "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
+              val execId = taskIdToExecutorId.getOrElse(tid, {
+                val errorMsg =
+                  "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"
+                taskSet.abort(errorMsg)
+                throw new SparkException(errorMsg)
+              })
               if (executorIdToRunningTaskIds.contains(execId)) {
                 reason = Some(
                   SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index bcf1fe2..2242d28 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 
 class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
 
-  def initLocalClusterSparkContext(): Unit = {
+  def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
     val conf = new SparkConf()
       // Init local cluster here so each barrier task runs in a separated process, thus `barrier()`
       // call is actually useful.
-      .setMaster("local-cluster[4, 1, 1024]")
+      .setMaster(s"local-cluster[$numWorker, 1, 1024]")
       .setAppName("test-cluster")
       .set(TEST_NO_STAGE_RETRY, true)
     sc = new SparkContext(conf)
@@ -276,4 +276,20 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
     initLocalClusterSparkContext()
     testBarrierTaskKilled(interruptOnKill = true)
   }
+
+  test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
+    initLocalClusterSparkContext(2)
+    val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
+    val dep = new OneToOneDependency[Int](rdd0)
+    // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
+    // scheduling. So, one of tasks won't be scheduled in one round of resource offer.
+    val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
+    val errorMsg = intercept[SparkException] {
+      rdd.barrier().mapPartitions { iter =>
+        BarrierTaskContext.get().barrier()
+        iter
+      }.collect()
+    }.getMessage
+    assert(errorMsg.contains("Fail resource offers for barrier stage"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org