You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/09 08:39:47 UTC

spark git commit: [SPARK-16522][MESOS] Spark application throws exception on exit.

Repository: spark
Updated Branches:
  refs/heads/master 801e4d097 -> af710e5bd


[SPARK-16522][MESOS] Spark application throws exception on exit.

## What changes were proposed in this pull request?
Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522.

I am not sure if there is any better fix, so wait for review comments.

## How was this patch tested?
Manual test. Observed that the exception is gone upon application exit.

Author: Sun Rui <su...@gmail.com>

Closes #14175 from sun-rui/SPARK-16522.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af710e5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af710e5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af710e5b

Branch: refs/heads/master
Commit: af710e5bdda9da04dbba615e219e7e496ca82acc
Parents: 801e4d0
Author: Sun Rui <su...@gmail.com>
Authored: Tue Aug 9 09:39:45 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Aug 9 09:39:45 2016 +0100

----------------------------------------------------------------------
 .../MesosCoarseGrainedSchedulerBackend.scala    |  7 ++++-
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 33 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af710e5b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 263e619..5177557 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       taskId: String,
       reason: String): Unit = {
     stateLock.synchronized {
-      removeExecutor(taskId, SlaveLost(reason))
+      // Do not call removeExecutor() after this scheduler backend was stopped because
+      // removeExecutor() internally will send a message to the driver endpoint but
+      // the driver endpoint is not available now, otherwise an exception will be thrown.
+      if (!stopCalled) {
+        removeExecutor(taskId, SlaveLost(reason))
+      }
       slaves(slaveId).taskIDs.remove(taskId)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/af710e5b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a74fdf7..0e66979 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.mesos.Utils._
 
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   private var backend: MesosCoarseGrainedSchedulerBackend = _
   private var externalShuffleClient: MesosExternalShuffleClient = _
   private var driverEndpoint: RpcEndpointRef = _
+  @volatile private var stopCalled = false
 
   test("mesos supports killing and limiting executors") {
     setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(!dockerInfo.getForcePullImage)
   }
 
+  test("Do not call removeExecutor() after backend is stopped") {
+    setBackend()
+
+    // launches a task on a valid offer
+    val offers = List((backend.executorMemory(sc), 1))
+    offerResources(offers)
+    verifyTaskLaunched(driver, "o1")
+
+    // launches a thread simulating status update
+    val statusUpdateThread = new Thread {
+      override def run(): Unit = {
+        while (!stopCalled) {
+          Thread.sleep(100)
+        }
+
+        val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+        backend.statusUpdate(driver, status)
+      }
+    }.start
+
+    backend.stop()
+    // Any method of the backend involving sending messages to the driver endpoint should not
+    // be called after the backend is stopped.
+    verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
         mesosDriver = newDriver
       }
 
+      override def stopExecutors(): Unit = {
+        stopCalled = true
+      }
+
       markRegistered()
     }
     backend.start()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org