You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/21 08:35:29 UTC

[spark] branch master updated: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 29cf0c4121f [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status
29cf0c4121f is described below

commit 29cf0c4121fd2dc7eee4d34404ee9353c8295d72
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Tue Jun 21 16:35:10 2022 +0800

    [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status
    
    ### What changes were proposed in this pull request?
    There is a case for current code may cause data correctness issue:
      1. One writing data task commit task success, but the container was killed after commit task, so this task failed, but the data file still remained under committedTaskPath .
      2. DAGScheduler call `handleCompletedTask` and then call `outputCommitCoordinator.taskCompleted`,  outputCommitCoordinator will remove the lock of failed task's partition.
      3. Then failed task's rerun with an new attempt,  the new attempt task call `outputCommitCoordinator.canCommit()` will return true since the lock of this partition had been removed, then it commit task success, also task final  succeed.
      4. Two files remained under this job's attempt path for  same partition.
      5. CommitJob commit both two committed task path's data.
      6. Finally data duplicated.
    
    In this pr, we do below since:
    
    1. When commit task success, executor send an CommitOutputSuccess message to  outputCommitCoordinator.
    2. When outputCommitCoordinator handle `taskComplete`, if task failed but commit success, means data duplicate will happen,  we should failed to job.
    
    ### Why are the changes needed?
    Fix data duplicated issue.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    Closes #36564 from AngersZhuuuu/SPARK-39195.
    
    Lead-authored-by: Angerszhuuuu <an...@gmail.com>
    Co-authored-by: AngersZhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala    |  7 ++++++-
 core/src/main/scala/org/apache/spark/SparkEnv.scala   | 12 +++++++++++-
 .../org/apache/spark/scheduler/DAGScheduler.scala     | 17 +++++++++++++++++
 .../apache/spark/scheduler/DAGSchedulerEvent.scala    |  4 ++++
 .../spark/scheduler/OutputCommitCoordinator.scala     | 11 +++++++----
 .../OutputCommitCoordinatorIntegrationSuite.scala     | 10 +++++-----
 .../scheduler/OutputCommitCoordinatorSuite.scala      | 19 ++++++++++---------
 7 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f771dbf1c01..dc83ac7ae63 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -276,7 +276,12 @@ class SparkContext(config: SparkConf) extends Logging {
       conf: SparkConf,
       isLocal: Boolean,
       listenerBus: LiveListenerBus): SparkEnv = {
-    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
+    SparkEnv.createDriverEnv(
+      conf,
+      isLocal,
+      listenerBus,
+      SparkContext.numDriverCores(master, conf),
+      this)
   }
 
   private[spark] def env: SparkEnv = _env
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 66ee959dbd8..2f9152d31a5 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -169,6 +169,7 @@ object SparkEnv extends Logging {
       isLocal: Boolean,
       listenerBus: LiveListenerBus,
       numCores: Int,
+      sparkContext: SparkContext,
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
     assert(conf.contains(DRIVER_HOST_ADDRESS),
       s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
@@ -191,6 +192,7 @@ object SparkEnv extends Logging {
       numCores,
       ioEncryptionKey,
       listenerBus = listenerBus,
+      Option(sparkContext),
       mockOutputCommitCoordinator = mockOutputCommitCoordinator
     )
   }
@@ -235,6 +237,7 @@ object SparkEnv extends Logging {
   /**
    * Helper method to create a SparkEnv for a driver or an executor.
    */
+  // scalastyle:off argcount
   private def create(
       conf: SparkConf,
       executorId: String,
@@ -245,7 +248,9 @@ object SparkEnv extends Logging {
       numUsableCores: Int,
       ioEncryptionKey: Option[Array[Byte]],
       listenerBus: LiveListenerBus = null,
+      sc: Option[SparkContext] = None,
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
+    // scalastyle:on argcount
 
     val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
 
@@ -391,7 +396,12 @@ object SparkEnv extends Logging {
     }
 
     val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
-      new OutputCommitCoordinator(conf, isDriver)
+      if (isDriver) {
+        new OutputCommitCoordinator(conf, isDriver, sc)
+      } else {
+        new OutputCommitCoordinator(conf, isDriver)
+      }
+
     }
     val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
       new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 289296f6fdb..9bd4a6f4478 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1175,6 +1175,13 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
   }
 
+  private[scheduler] def handleStageFailed(
+      stageId: Int,
+      reason: String,
+      exception: Option[Throwable]): Unit = {
+    stageIdToStage.get(stageId).foreach { abortStage(_, reason, exception) }
+  }
+
   private[scheduler] def handleTaskSetFailed(
       taskSet: TaskSet,
       reason: String,
@@ -2608,6 +2615,13 @@ private[spark] class DAGScheduler(
     runningStages -= stage
   }
 
+  /**
+   * Called by the OutputCommitCoordinator to cancel stage due to data duplication may happen.
+   */
+  private[scheduler] def stageFailed(stageId: Int, reason: String): Unit = {
+    eventProcessLoop.post(StageFailed(stageId, reason, None))
+  }
+
   /**
    * Aborts all jobs depending on a particular Stage. This is called in response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
@@ -2876,6 +2890,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case completion: CompletionEvent =>
       dagScheduler.handleTaskCompletion(completion)
 
+    case StageFailed(stageId, reason, exception) =>
+      dagScheduler.handleStageFailed(stageId, reason, exception)
+
     case TaskSetFailed(taskSet, reason, exception) =>
       dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f3798da5aa1..f9df8de620f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -88,6 +88,10 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR
 private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
   extends DAGSchedulerEvent
 
+private[scheduler]
+case class StageFailed(stageId: Int, reason: String, exception: Option[Throwable])
+  extends DAGSchedulerEvent
+
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
   extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index a5858ebf9cd..a33c2bb93bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -44,7 +44,10 @@ private case class AskPermissionToCommitOutput(
  * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
  * for an extensive design discussion.
  */
-private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
+private[spark] class OutputCommitCoordinator(
+    conf: SparkConf,
+    isDriver: Boolean,
+    sc: Option[SparkContext] = None) extends Logging {
 
   // Initialized by SparkEnv
   var coordinatorRef: Option[RpcEndpointRef] = None
@@ -155,9 +158,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
         val taskId = TaskIdentifier(stageAttempt, attemptNumber)
         stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
         if (stageState.authorizedCommitters(partition) == taskId) {
-          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
-            s"partition=$partition) failed; clearing lock")
-          stageState.authorizedCommitters(partition) = null
+          sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " +
+            s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
+            s"but task commit success, data duplication may happen."))
         }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
index 7d063c3b3ac..66b13be4f7a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -19,9 +19,8 @@ package org.apache.spark.scheduler
 
 import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
-import org.scalatest.time.{Seconds, Span}
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext}
 
 /**
  * Integration tests for the OutputCommitCoordinator.
@@ -45,13 +44,14 @@ class OutputCommitCoordinatorIntegrationSuite
     sc = new SparkContext("local[2, 4]", "test", conf)
   }
 
-  test("exception thrown in OutputCommitter.commitTask()") {
+  test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") {
     // Regression test for SPARK-10381
-    failAfter(Span(60, Seconds)) {
+    val e = intercept[SparkException] {
       withTempDir { tempDir =>
         sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
       }
-    }
+    }.getCause.getMessage
+    assert(e.endsWith("failed; but task commit success, data duplication may happen."))
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 360f7e1b4e4..62c559b52ff 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -86,11 +86,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
           conf: SparkConf,
           isLocal: Boolean,
           listenerBus: LiveListenerBus): SparkEnv = {
-        outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
+        outputCommitCoordinator =
+          spy(new OutputCommitCoordinator(conf, isDriver = true, Option(this)))
         // Use Mockito.spy() to maintain the default infrastructure everywhere else.
         // This mocking allows us to control the coordinator responses in test cases.
         SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
-          SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
+          SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator))
       }
     }
     // Use Mockito.spy() to maintain the default infrastructure everywhere else
@@ -187,12 +188,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
       attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
-    // A new task should now be allowed to become the authorized committer
-    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
-      nonAuthorizedCommitter + 2))
-    // There can only be one authorized committer
+    // A new task should not be allowed to become stage failed because of potential data duplication
     assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
-      nonAuthorizedCommitter + 3))
+      nonAuthorizedCommitter + 2))
   }
 
   test("SPARK-19631: Do not allow failed attempts to be authorized for committing") {
@@ -226,7 +224,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt))
 
     // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
-    // then fail the 1st attempt and make sure the 4th one can commit again.
+    // then fail the 1st attempt and since stage failed because of potential data duplication,
+    // make sure fail the 4th attempt.
     stage += 1
     outputCommitCoordinator.stageStart(stage, maxPartitionId = 1)
     assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt))
@@ -235,7 +234,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
+    // A new task should not be allowed to become the authorized committer since stage failed
+    // because of potential data duplication
+    assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
   }
 
   test("SPARK-24589: Make sure stage state is cleaned up") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org