You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/13 06:55:43 UTC

git commit: [SPARK-3469] Make sure all TaskCompletionListener are called even with failures

Repository: spark
Updated Branches:
  refs/heads/master 6d887db78 -> 2584ea5b2


[SPARK-3469] Make sure all TaskCompletionListener are called even with failures

This is necessary because we rely on this callback interface to clean resources up. The old behavior would lead to resource leaks.

Note that this also changes the fault semantics of TaskCompletionListener. Previously failures in TaskCompletionListeners would result in the task being reported immediately. With this change, we report the exception at the end, and the reported exception is a TaskCompletionListenerException that contains all the exception messages.

Author: Reynold Xin <rx...@apache.org>

Closes #2343 from rxin/taskcontext-callback and squashes the following commits:

a3845b2 [Reynold Xin] Mark TaskCompletionListenerException as private[spark].
ac5baea [Reynold Xin] Removed obsolete comment.
aa68ea4 [Reynold Xin] Throw an exception if task completion callback fails.
29b6162 [Reynold Xin] oops compilation failed.
1cb444d [Reynold Xin] [SPARK-3469] Call all TaskCompletionListeners even if some fail.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2584ea5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2584ea5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2584ea5b

Branch: refs/heads/master
Commit: 2584ea5b23b1c5a4df9549b94bfc9b8e0900532e
Parents: 6d887db
Author: Reynold Xin <rx...@apache.org>
Authored: Fri Sep 12 21:55:39 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Sep 12 21:55:39 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskContext.scala    | 18 +++++++++--
 .../util/TaskCompletionListenerException.scala  | 34 ++++++++++++++++++++
 .../spark/scheduler/TaskContextSuite.scala      | 22 +++++++++++--
 3 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2584ea5b/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 2b99b8a..51b3e4d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}
 
 
 /**
@@ -41,7 +41,7 @@ class TaskContext(
     val attemptId: Long,
     val runningLocally: Boolean = false,
     private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
-  extends Serializable {
+  extends Serializable with Logging {
 
   @deprecated("use partitionId", "0.8.1")
   def splitId = partitionId
@@ -103,8 +103,20 @@ class TaskContext(
   /** Marks the task as completed and triggers the listeners. */
   private[spark] def markTaskCompleted(): Unit = {
     completed = true
+    val errorMsgs = new ArrayBuffer[String](2)
     // Process complete callbacks in the reverse order of registration
-    onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
+    onCompleteCallbacks.reverse.foreach { listener =>
+      try {
+        listener.onTaskCompletion(this)
+      } catch {
+        case e: Throwable =>
+          errorMsgs += e.getMessage
+          logError("Error in TaskCompletionListener", e)
+      }
+    }
+    if (errorMsgs.nonEmpty) {
+      throw new TaskCompletionListenerException(errorMsgs)
+    }
   }
 
   /** Marks the task for interruption, i.e. cancellation. */

http://git-wip-us.apache.org/repos/asf/spark/blob/2584ea5b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala
new file mode 100644
index 0000000..f64e069
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * Exception thrown when there is an exception in
+ * executing the callback in TaskCompletionListener.
+ */
+private[spark]
+class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception {
+
+  override def getMessage: String = {
+    if (errorMessages.size == 1) {
+      errorMessages.head
+    } else {
+      errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2584ea5b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index db2ad82..faba550 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.scheduler
 
+import org.mockito.Mockito._
+import org.mockito.Matchers.any
+
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}
+
 
 class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
 
-  test("Calls executeOnCompleteCallbacks after failure") {
+  test("calls TaskCompletionListener after failure") {
     TaskContextSuite.completed = false
     sc = new SparkContext("local", "test")
     val rdd = new RDD[String](sc, List()) {
@@ -45,6 +49,20 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
     }
     assert(TaskContextSuite.completed === true)
   }
+
+  test("all TaskCompletionListeners should be called even if some fail") {
+    val context = new TaskContext(0, 0, 0)
+    val listener = mock(classOf[TaskCompletionListener])
+    context.addTaskCompletionListener(_ => throw new Exception("blah"))
+    context.addTaskCompletionListener(listener)
+    context.addTaskCompletionListener(_ => throw new Exception("blah"))
+
+    intercept[TaskCompletionListenerException] {
+      context.markTaskCompleted()
+    }
+
+    verify(listener, times(1)).onTaskCompletion(any())
+  }
 }
 
 private object TaskContextSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org