You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/01 01:18:51 UTC

spark git commit: [SPARK-6614] OutputCommitCoordinator should clear authorized committer only after authorized committer fails, not after any failure

Repository: spark
Updated Branches:
  refs/heads/master 0e00f12d3 -> 37326079d


[SPARK-6614] OutputCommitCoordinator should clear authorized committer only after authorized committer fails, not after any failure

In OutputCommitCoordinator, there is some logic to clear the authorized committer's lock on committing in case that task fails.  However, it looks like the current code also clears this lock if other non-authorized tasks fail, which is an obvious bug.

In theory, it's possible that this could allow a new committer to start, run to completion, and commit output before the authorized committer finished, but it's unlikely that this race occurs often in practice due to the complex combination of failure and timing conditions that would be required to expose it.

This patch addresses this issue and adds a regression test.

Thanks to aarondav for spotting this issue.

Author: Josh Rosen <jo...@databricks.com>

Closes #5276 from JoshRosen/SPARK-6614 and squashes the following commits:

d532ba7 [Josh Rosen] Check whether failed task was authorized committer
cbb3784 [Josh Rosen] Add regression test for SPARK-6614


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37326079
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37326079
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37326079

Branch: refs/heads/master
Commit: 37326079d818fdb140415a65653767d997613dac
Parents: 0e00f12
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Mar 31 16:18:39 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Mar 31 16:18:39 2015 -0700

----------------------------------------------------------------------
 .../scheduler/OutputCommitCoordinator.scala     |  8 ++++---
 .../OutputCommitCoordinatorSuite.scala          | 25 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37326079/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 17055e2..9e29fd1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -113,9 +113,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
         logInfo(
           s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
       case otherReason =>
-        logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
-          s" clearing lock")
-        authorizedCommitters.remove(partition)
+        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
+          logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
+            s" clearing lock")
+          authorizedCommitters.remove(partition)
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/37326079/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index c8c9578..cf97707 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -161,6 +161,31 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
     }
     assert(tempDir.list().size === 0)
   }
+
+  test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
+    val stage: Int = 1
+    val partition: Long = 2
+    val authorizedCommitter: Long = 3
+    val nonAuthorizedCommitter: Long = 100
+    outputCommitCoordinator.stageStart(stage)
+    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+    // The non-authorized committer fails
+    outputCommitCoordinator.taskCompleted(
+      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+    // New tasks should still not be able to commit because the authorized committer has not failed
+    assert(
+      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+    // The authorized committer now fails, clearing the lock
+    outputCommitCoordinator.taskCompleted(
+      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+    // A new task should now be allowed to become the authorized committer
+    assert(
+      outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+    // There can only be one authorized committer
+    assert(
+      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org