You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/09/21 22:21:18 UTC

spark git commit: [SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator (branch-1.4 backport)

Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0cf28708e -> df9e39470


[SPARK-10381] Fix mixup of taskAttemptNumber & attemptId in OutputCommitCoordinator (branch-1.4 backport)

This is a backport of #8544 to `branch-1.4` for inclusion in 1.4.2.

Author: Josh Rosen <jo...@databricks.com>

Closes #8789 from JoshRosen/SPARK-10381-1.4.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df9e3947
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df9e3947
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df9e3947

Branch: refs/heads/branch-1.4
Commit: df9e39470823a04677edbce09dffec47cb4a444f
Parents: 0cf2870
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Sep 21 13:21:11 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Sep 21 13:21:11 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkHadoopWriter.scala    |  3 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |  7 +-
 .../spark/executor/CommitDeniedException.scala  |  4 +-
 .../spark/mapred/SparkHadoopMapRedUtil.scala    | 11 ++--
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 +-
 .../scheduler/OutputCommitCoordinator.scala     | 48 +++++++-------
 .../org/apache/spark/scheduler/TaskInfo.scala   |  7 +-
 .../spark/status/api/v1/AllStagesResource.scala |  2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |  6 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  2 +-
 ...utputCommitCoordinatorIntegrationSuite.scala | 68 ++++++++++++++++++++
 .../OutputCommitCoordinatorSuite.scala          | 24 ++++---
 .../apache/spark/util/JsonProtocolSuite.scala   |  2 +-
 project/MimaExcludes.scala                      | 17 +++++
 .../org/apache/spark/sql/sources/commands.scala |  3 +-
 .../spark/sql/hive/hiveWriterContainers.scala   |  2 +-
 16 files changed, 156 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 59ac82c..dfbdf27 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -103,8 +103,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
   }
 
   def commit() {
-    SparkHadoopMapRedUtil.commitTask(
-      getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(getOutputCommitter(), getTaskContext(), jobID, splitID)
   }
 
   def commitJob() {

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 48fd3e7..b7fe655 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -151,9 +151,12 @@ case object TaskKilled extends TaskFailedReason {
  * Task requested the driver to commit, but was denied.
  */
 @DeveloperApi
-case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
+case class TaskCommitDenied(
+    jobID: Int,
+    partitionID: Int,
+    attemptNumber: Int) extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
-    s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+    s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
index f47d7ef..7d84889 100644
--- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala
@@ -26,8 +26,8 @@ private[spark] class CommitDeniedException(
     msg: String,
     jobID: Int,
     splitID: Int,
-    attemptID: Int)
+    attemptNumber: Int)
   extends Exception(msg) {
 
-  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptID)
+  def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index f0eb52e..c57dff7 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -90,8 +90,7 @@ object SparkHadoopMapRedUtil extends Logging {
       committer: MapReduceOutputCommitter,
       mrTaskContext: MapReduceTaskAttemptContext,
       jobId: Int,
-      splitId: Int,
-      attemptId: Int): Unit = {
+      splitId: Int): Unit = {
 
     val mrTaskAttemptID = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(mrTaskContext)
 
@@ -121,7 +120,8 @@ object SparkHadoopMapRedUtil extends Logging {
 
       if (shouldCoordinateWithDriver) {
         val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
-        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)
+        val taskAttemptNumber = TaskContext.get().attemptNumber()
+        val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
 
         if (canCommit) {
           performCommit()
@@ -131,7 +131,7 @@ object SparkHadoopMapRedUtil extends Logging {
           logInfo(message)
           // We need to abort the task so that the driver can reschedule new attempts, if necessary
           committer.abortTask(mrTaskContext)
-          throw new CommitDeniedException(message, jobId, splitId, attemptId)
+          throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
         }
       } else {
         // Speculation is disabled or a user has chosen to manually bypass the commit coordination
@@ -151,7 +151,6 @@ object SparkHadoopMapRedUtil extends Logging {
       committer,
       mrTaskContext,
       sparkTaskContext.stageId(),
-      sparkTaskContext.partitionId(),
-      sparkTaskContext.attemptNumber())
+      sparkTaskContext.partitionId())
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fd20ebb..7778e82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -981,8 +981,11 @@ class DAGScheduler(
     val stageId = task.stageId
     val taskType = Utils.getFormattedClassName(task)
 
-    outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
-      event.taskInfo.attempt, event.reason)
+    outputCommitCoordinator.taskCompleted(
+      stageId,
+      task.partitionId,
+      event.taskInfo.attemptNumber, // this is a task attempt number
+      event.reason)
 
     // The success case is dealt with separately below, since we need to compute accumulator
     // updates before posting.

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 8321037..27e373f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, RpcEndpoint
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
 private case object StopCoordinator extends OutputCommitCoordinationMessage
-private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttempt: Long)
+private case class AskPermissionToCommitOutput(stage: Int, partition: Int, attemptNumber: Int)
 
 /**
  * Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins"
@@ -44,8 +44,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
   var coordinatorRef: Option[RpcEndpointRef] = None
 
   private type StageId = Int
-  private type PartitionId = Long
-  private type TaskAttemptId = Long
+  private type PartitionId = Int
+  private type TaskAttemptNumber = Int
 
   /**
    * Map from active stages's id => partition id => task attempt with exclusive lock on committing
@@ -57,7 +57,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
    * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance.
    */
   private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
-  private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
+  private type CommittersByStageMap =
+    mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptNumber]]
 
   /**
    * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
@@ -75,14 +76,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
    *
    * @param stage the stage number
    * @param partition the partition number
-   * @param attempt a unique identifier for this task attempt
+   * @param attemptNumber how many times this task has been attempted
+   *                      (see [[TaskContext.attemptNumber()]])
    * @return true if this task is authorized to commit, false otherwise
    */
   def canCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = {
-    val msg = AskPermissionToCommitOutput(stage, partition, attempt)
+      attemptNumber: TaskAttemptNumber): Boolean = {
+    val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
     coordinatorRef match {
       case Some(endpointRef) =>
         endpointRef.askWithRetry[Boolean](msg)
@@ -95,7 +97,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
 
   // Called by DAGScheduler
   private[scheduler] def stageStart(stage: StageId): Unit = synchronized {
-    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptId]()
+    authorizedCommittersByStage(stage) = mutable.HashMap[PartitionId, TaskAttemptNumber]()
   }
 
   // Called by DAGScheduler
@@ -107,7 +109,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
   private[scheduler] def taskCompleted(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId,
+      attemptNumber: TaskAttemptNumber,
       reason: TaskEndReason): Unit = synchronized {
     val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, {
       logDebug(s"Ignoring task completion for completed stage")
@@ -117,12 +119,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
       case Success =>
       // The task output has been committed successfully
       case denied: TaskCommitDenied =>
-        logInfo(
-          s"Task was denied committing, stage: $stage, partition: $partition, attempt: $attempt")
+        logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " +
+          s"attempt: $attemptNumber")
       case otherReason =>
-        if (authorizedCommitters.get(partition).exists(_ == attempt)) {
-          logDebug(s"Authorized committer $attempt (stage=$stage, partition=$partition) failed;" +
-            s" clearing lock")
+        if (authorizedCommitters.get(partition).exists(_ == attemptNumber)) {
+          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
+            s"partition=$partition) failed; clearing lock")
           authorizedCommitters.remove(partition)
         }
     }
@@ -140,21 +142,23 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
   private[scheduler] def handleAskPermissionToCommit(
       stage: StageId,
       partition: PartitionId,
-      attempt: TaskAttemptId): Boolean = synchronized {
+      attemptNumber: TaskAttemptNumber): Boolean = synchronized {
     authorizedCommittersByStage.get(stage) match {
       case Some(authorizedCommitters) =>
         authorizedCommitters.get(partition) match {
           case Some(existingCommitter) =>
-            logDebug(s"Denying $attempt to commit for stage=$stage, partition=$partition; " +
-              s"existingCommitter = $existingCommitter")
+            logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
+              s"partition=$partition; existingCommitter = $existingCommitter")
             false
           case None =>
-            logDebug(s"Authorizing $attempt to commit for stage=$stage, partition=$partition")
-            authorizedCommitters(partition) = attempt
+            logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " +
+              s"partition=$partition")
+            authorizedCommitters(partition) = attemptNumber
             true
         }
       case None =>
-        logDebug(s"Stage $stage has completed, so not allowing task attempt $attempt to commit")
+        logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +
+          s"partition $partition to commit")
         false
     }
   }
@@ -174,9 +178,9 @@ private[spark] object OutputCommitCoordinator {
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-      case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
+      case AskPermissionToCommitOutput(stage, partition, attemptNumber) =>
         context.reply(
-          outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
+          outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, attemptNumber))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 132a9ce..f113c2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -29,7 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
 class TaskInfo(
     val taskId: Long,
     val index: Int,
-    val attempt: Int,
+    val attemptNumber: Int,
     val launchTime: Long,
     val executorId: String,
     val host: String,
@@ -95,7 +95,10 @@ class TaskInfo(
     }
   }
 
-  def id: String = s"$index.$attempt"
+  @deprecated("Use attemptNumber", "1.6.0")
+  def attempt: Int = attemptNumber
+
+  def id: String = s"$index.$attemptNumber"
 
   def duration: Long = {
     if (!finished) {

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 390c136..24a0b52 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -127,7 +127,7 @@ private[v1] object AllStagesResource {
     new TaskData(
       taskId = uiData.taskInfo.taskId,
       index = uiData.taskInfo.index,
-      attempt = uiData.taskInfo.attempt,
+      attempt = uiData.taskInfo.attemptNumber,
       launchTime = new Date(uiData.taskInfo.launchTime),
       executorId = uiData.taskInfo.executorId,
       host = uiData.taskInfo.host,

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 7b94eac..6ed5cb0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -569,7 +569,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
           serializationTimeProportionPos + serializationTimeProportion
 
         val index = taskInfo.index
-        val attempt = taskInfo.attempt
+        val attempt = taskInfo.attemptNumber
         val timelineObject =
           s"""
              {
@@ -742,8 +742,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
       <tr>
         <td>{info.index}</td>
         <td>{info.taskId}</td>
-        <td sorttable_customkey={info.attempt.toString}>{
-          if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
+        <td sorttable_customkey={info.attemptNumber.toString}>{
+          if (info.speculative) s"${info.attempt} (speculative)" else info.attemptNumber.toString
         }</td>
         <td>{info.status}</td>
         <td>{info.taskLocality}</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3f162d1..dc8ca0e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -250,7 +250,7 @@ private[spark] object JsonProtocol {
   def taskInfoToJson(taskInfo: TaskInfo): JValue = {
     ("Task ID" -> taskInfo.taskId) ~
     ("Index" -> taskInfo.index) ~
-    ("Attempt" -> taskInfo.attempt) ~
+    ("Attempt" -> taskInfo.attemptNumber) ~
     ("Launch Time" -> taskInfo.launchTime) ~
     ("Executor ID" -> taskInfo.executorId) ~
     ("Host" -> taskInfo.host) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
new file mode 100644
index 0000000..1ae5b03
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.{Span, Seconds}
+
+import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext, SparkFunSuite, TaskContext}
+import org.apache.spark.util.Utils
+
+/**
+ * Integration tests for the OutputCommitCoordinator.
+ *
+ * See also: [[OutputCommitCoordinatorSuite]] for unit tests that use mocks.
+ */
+class OutputCommitCoordinatorIntegrationSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with Timeouts {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .set("master", "local[2,4]")
+      .set("spark.speculation", "true")
+      .set("spark.hadoop.mapred.output.committer.class",
+        classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
+    sc = new SparkContext("local[2, 4]", "test", conf)
+  }
+
+  test("exception thrown in OutputCommitter.commitTask()") {
+    // Regression test for SPARK-10381
+    failAfter(Span(60, Seconds)) {
+      val tempDir = Utils.createTempDir()
+      try {
+        sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
+      } finally {
+        Utils.deleteRecursively(tempDir)
+      }
+    }
+  }
+}
+
+private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
+  override def commitTask(context: TaskAttemptContext): Unit = {
+    val ctx = TaskContext.get()
+    if (ctx.attemptNumber < 1) {
+      throw new java.io.FileNotFoundException("Intentional exception")
+    }
+    super.commitTask(context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index a9036da..3fd8ada 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -63,6 +63,9 @@ import scala.language.postfixOps
  * was not in SparkHadoopWriter, the tests would still pass because only one of the
  * increments would be captured even though the commit in both tasks was executed
  * erroneously.
+ *
+ * See also: [[OutputCommitCoordinatorIntegrationSuite]] for integration tests that do
+ * not use mocks.
  */
 class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -164,27 +167,28 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("Only authorized committer failures can clear the authorized committer lock (SPARK-6614)") {
     val stage: Int = 1
-    val partition: Long = 2
-    val authorizedCommitter: Long = 3
-    val nonAuthorizedCommitter: Long = 100
+    val partition: Int = 2
+    val authorizedCommitter: Int = 3
+    val nonAuthorizedCommitter: Int = 100
     outputCommitCoordinator.stageStart(stage)
-    assert(outputCommitCoordinator.canCommit(stage, partition, attempt = authorizedCommitter))
-    assert(!outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter))
+
+    assert(outputCommitCoordinator.canCommit(stage, partition, authorizedCommitter))
+    assert(!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter))
     // The non-authorized committer fails
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = nonAuthorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = nonAuthorizedCommitter, reason = TaskKilled)
     // New tasks should still not be able to commit because the authorized committer has not failed
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 1))
+      !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 1))
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(
-      stage, partition, attempt = authorizedCommitter, reason = TaskKilled)
+      stage, partition, attemptNumber = authorizedCommitter, reason = TaskKilled)
     // A new task should now be allowed to become the authorized committer
     assert(
-      outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 2))
+      outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 2))
     // There can only be one authorized committer
     assert(
-      !outputCommitCoordinator.canCommit(stage, partition, attempt = nonAuthorizedCommitter + 3))
+      !outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index dec92be..74b3dea 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -479,7 +479,7 @@ class JsonProtocolSuite extends SparkFunSuite {
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
     assert(info1.taskId === info2.taskId)
     assert(info1.index === info2.index)
-    assert(info1.attempt === info2.attempt)
+    assert(info1.attemptNumber === info2.attemptNumber)
     assert(info1.launchTime === info2.launchTime)
     assert(info1.executorId === info2.executorId)
     assert(info1.host === info2.host)

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 11b439e..68b4549 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -155,6 +155,23 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.PostgresQuirks"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.NoQuirks"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.MySQLQuirks")
+          ) ++ Seq(
+            // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message.
+            // This class is marked as `private` but MiMa still seems to be confused by the change.
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.task"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+               "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$2"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy"),
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.taskAttempt"),
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.copy$default$3"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.this"),
+            ProblemFilters.exclude[IncompatibleMethTypeProblem](
+              "org.apache.spark.scheduler.AskPermissionToCommitOutput.apply")
           )
 
         case v if v.startsWith("1.3") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 29a47f5..363cb18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -433,8 +433,7 @@ private[sql] abstract class BaseWriterContainer(
   protected def initWriters(): Unit
 
   def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
+    SparkHadoopMapRedUtil.commitTask(outputCommitter, taskAttemptContext, jobId.getId, taskId.getId)
   }
 
   def abortTask(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/df9e3947/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index e1e8e47..1de8d61 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -119,7 +119,7 @@ private[hive] class SparkHiveWriterContainer(
   }
 
   protected def commit() {
-    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
+    SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
   }
 
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org