You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/02/24 21:04:03 UTC

spark git commit: [SPARK-19597][CORE] test case for task deserialization errors

Repository: spark
Updated Branches:
  refs/heads/master 5cbd3b59b -> 5f74148bb


[SPARK-19597][CORE] test case  for task deserialization errors

Adds a test case that ensures that Executors gracefully handle a task that fails to deserialize, by sending back a reasonable failure message.  This does not change any behavior (the prior behavior was already correct), it just adds a test case to prevent regression.

Author: Imran Rashid <ir...@cloudera.com>

Closes #16930 from squito/executor_task_deserialization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f74148b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f74148b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f74148b

Branch: refs/heads/master
Commit: 5f74148bb45912b9f867174de46e246215c93ee1
Parents: 5cbd3b5
Author: Imran Rashid <ir...@cloudera.com>
Authored: Fri Feb 24 13:03:37 2017 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Feb 24 13:03:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |   2 +
 .../apache/spark/executor/ExecutorSuite.scala   | 139 ++++++++++++++-----
 2 files changed, 108 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5f74148b/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d762f11..975a6e4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -148,6 +148,8 @@ private[spark] class Executor(
 
   startDriverHeartbeater()
 
+  private[executor] def numRunningTasks: Int = runningTasks.size()
+
   def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
     val tr = new TaskRunner(context, taskDescription)
     runningTasks.put(taskDescription.taskId, tr)

http://git-wip-us.apache.org/repos/asf/spark/blob/5f74148b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index f94baaa..b743ff5 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -17,16 +17,21 @@
 
 package org.apache.spark.executor
 
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
 import java.nio.ByteBuffer
 import java.util.Properties
-import java.util.concurrent.CountDownLatch
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.mutable.Map
+import scala.concurrent.duration._
 
-import org.mockito.Matchers._
-import org.mockito.Mockito.{mock, when}
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.{inOrder, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
@@ -36,35 +41,15 @@ import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.{FakeTask, TaskDescription}
 import org.apache.spark.serializer.JavaSerializer
 
-class ExecutorSuite extends SparkFunSuite {
+class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually {
 
   test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") {
     // mock some objects to make Executor.launchTask() happy
     val conf = new SparkConf
     val serializer = new JavaSerializer(conf)
-    val mockEnv = mock(classOf[SparkEnv])
-    val mockRpcEnv = mock(classOf[RpcEnv])
-    val mockMetricsSystem = mock(classOf[MetricsSystem])
-    val mockMemoryManager = mock(classOf[MemoryManager])
-    when(mockEnv.conf).thenReturn(conf)
-    when(mockEnv.serializer).thenReturn(serializer)
-    when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
-    when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
-    when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
-    when(mockEnv.closureSerializer).thenReturn(serializer)
-    val fakeTaskMetrics = serializer.newInstance().serialize(TaskMetrics.registered).array()
-    val serializedTask = serializer.newInstance().serialize(
-      new FakeTask(0, 0, Nil, fakeTaskMetrics))
-    val taskDescription = new TaskDescription(
-      taskId = 0,
-      attemptNumber = 0,
-      executorId = "",
-      name = "",
-      index = 0,
-      addedFiles = Map[String, Long](),
-      addedJars = Map[String, Long](),
-      properties = new Properties,
-      serializedTask)
+    val env = createMockEnv(conf, serializer)
+    val serializedTask = serializer.newInstance().serialize(new FakeTask(0, 0))
+    val taskDescription = createFakeTaskDescription(serializedTask)
 
     // we use latches to force the program to run in this order:
     // +-----------------------------+---------------------------------------+
@@ -86,7 +71,7 @@ class ExecutorSuite extends SparkFunSuite {
 
     val executorSuiteHelper = new ExecutorSuiteHelper
 
-    val mockExecutorBackend = mock(classOf[ExecutorBackend])
+    val mockExecutorBackend = mock[ExecutorBackend]
     when(mockExecutorBackend.statusUpdate(any(), any(), any()))
       .thenAnswer(new Answer[Unit] {
         var firstTime = true
@@ -102,8 +87,8 @@ class ExecutorSuite extends SparkFunSuite {
             val taskState = invocationOnMock.getArguments()(1).asInstanceOf[TaskState]
             executorSuiteHelper.taskState = taskState
             val taskEndReason = invocationOnMock.getArguments()(2).asInstanceOf[ByteBuffer]
-            executorSuiteHelper.testFailedReason
-              = serializer.newInstance().deserialize(taskEndReason)
+            executorSuiteHelper.testFailedReason =
+              serializer.newInstance().deserialize(taskEndReason)
             // let the main test thread check `taskState` and `testFailedReason`
             executorSuiteHelper.latch3.countDown()
           }
@@ -112,16 +97,20 @@ class ExecutorSuite extends SparkFunSuite {
 
     var executor: Executor = null
     try {
-      executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil, isLocal = true)
+      executor = new Executor("id", "localhost", env, userClassPath = Nil, isLocal = true)
       // the task will be launched in a dedicated worker thread
       executor.launchTask(mockExecutorBackend, taskDescription)
 
-      executorSuiteHelper.latch1.await()
+      if (!executorSuiteHelper.latch1.await(5, TimeUnit.SECONDS)) {
+        fail("executor did not send first status update in time")
+      }
       // we know the task will be started, but not yet deserialized, because of the latches we
       // use in mockExecutorBackend.
       executor.killAllTasks(true)
       executorSuiteHelper.latch2.countDown()
-      executorSuiteHelper.latch3.await()
+      if (!executorSuiteHelper.latch3.await(5, TimeUnit.SECONDS)) {
+        fail("executor did not send second status update in time")
+      }
 
       // `testFailedReason` should be `TaskKilled`; `taskState` should be `KILLED`
       assert(executorSuiteHelper.testFailedReason === TaskKilled)
@@ -133,6 +122,79 @@ class ExecutorSuite extends SparkFunSuite {
       }
     }
   }
+
+  test("Gracefully handle error in task deserialization") {
+    val conf = new SparkConf
+    val serializer = new JavaSerializer(conf)
+    val env = createMockEnv(conf, serializer)
+    val serializedTask = serializer.newInstance().serialize(new NonDeserializableTask)
+    val taskDescription = createFakeTaskDescription(serializedTask)
+
+    val failReason = runTaskAndGetFailReason(taskDescription)
+    failReason match {
+      case ef: ExceptionFailure =>
+        assert(ef.exception.isDefined)
+        assert(ef.exception.get.getMessage() === NonDeserializableTask.errorMsg)
+      case _ =>
+        fail(s"unexpected failure type: $failReason")
+    }
+  }
+
+  private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = {
+    val mockEnv = mock[SparkEnv]
+    val mockRpcEnv = mock[RpcEnv]
+    val mockMetricsSystem = mock[MetricsSystem]
+    val mockMemoryManager = mock[MemoryManager]
+    when(mockEnv.conf).thenReturn(conf)
+    when(mockEnv.serializer).thenReturn(serializer)
+    when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
+    when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
+    when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
+    when(mockEnv.closureSerializer).thenReturn(serializer)
+    SparkEnv.set(mockEnv)
+    mockEnv
+  }
+
+  private def createFakeTaskDescription(serializedTask: ByteBuffer): TaskDescription = {
+    new TaskDescription(
+      taskId = 0,
+      attemptNumber = 0,
+      executorId = "",
+      name = "",
+      index = 0,
+      addedFiles = Map[String, Long](),
+      addedJars = Map[String, Long](),
+      properties = new Properties,
+      serializedTask)
+  }
+
+  private def runTaskAndGetFailReason(taskDescription: TaskDescription): TaskFailedReason = {
+    val mockBackend = mock[ExecutorBackend]
+    var executor: Executor = null
+    try {
+      executor = new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true)
+      // the task will be launched in a dedicated worker thread
+      executor.launchTask(mockBackend, taskDescription)
+      eventually(timeout(5 seconds), interval(10 milliseconds)) {
+        assert(executor.numRunningTasks === 0)
+      }
+    } finally {
+      if (executor != null) {
+        executor.stop()
+      }
+    }
+    val orderedMock = inOrder(mockBackend)
+    val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+    orderedMock.verify(mockBackend)
+      .statusUpdate(meq(0L), meq(TaskState.RUNNING), statusCaptor.capture())
+    orderedMock.verify(mockBackend)
+      .statusUpdate(meq(0L), meq(TaskState.FAILED), statusCaptor.capture())
+    // first statusUpdate for RUNNING has empty data
+    assert(statusCaptor.getAllValues().get(0).remaining() === 0)
+    // second update is more interesting
+    val failureData = statusCaptor.getAllValues.get(1)
+    SparkEnv.get.closureSerializer.newInstance().deserialize[TaskFailedReason](failureData)
+  }
 }
 
 // Helps to test("SPARK-15963")
@@ -145,3 +207,14 @@ private class ExecutorSuiteHelper {
   @volatile var taskState: TaskState = _
   @volatile var testFailedReason: TaskFailedReason = _
 }
+
+private class NonDeserializableTask extends FakeTask(0, 0) with Externalizable {
+  def writeExternal(out: ObjectOutput): Unit = {}
+  def readExternal(in: ObjectInput): Unit = {
+    throw new RuntimeException(NonDeserializableTask.errorMsg)
+  }
+}
+
+private object NonDeserializableTask {
+  val errorMsg = "failure in deserialization"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org