You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/09 08:39:47 UTC
spark git commit: [SPARK-16522][MESOS] Spark application throws
exception on exit.
Repository: spark
Updated Branches:
refs/heads/master 801e4d097 -> af710e5bd
[SPARK-16522][MESOS] Spark application throws exception on exit.
## What changes were proposed in this pull request?
Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522.
I am not sure if there is any better fix, so wait for review comments.
## How was this patch tested?
Manual test. Observed that the exception is gone upon application exit.
Author: Sun Rui <su...@gmail.com>
Closes #14175 from sun-rui/SPARK-16522.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af710e5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af710e5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af710e5b
Branch: refs/heads/master
Commit: af710e5bdda9da04dbba615e219e7e496ca82acc
Parents: 801e4d0
Author: Sun Rui <su...@gmail.com>
Authored: Tue Aug 9 09:39:45 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Aug 9 09:39:45 2016 +0100
----------------------------------------------------------------------
.../MesosCoarseGrainedSchedulerBackend.scala | 7 ++++-
...esosCoarseGrainedSchedulerBackendSuite.scala | 33 ++++++++++++++++++++
2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/af710e5b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 263e619..5177557 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String,
reason: String): Unit = {
stateLock.synchronized {
- removeExecutor(taskId, SlaveLost(reason))
+ // Do not call removeExecutor() after this scheduler backend was stopped because
+ // removeExecutor() internally will send a message to the driver endpoint but
+ // the driver endpoint is not available now, otherwise an exception will be thrown.
+ if (!stopCalled) {
+ removeExecutor(taskId, SlaveLost(reason))
+ }
slaves(slaveId).taskIDs.remove(taskId)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/af710e5b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a74fdf7..0e66979 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
+ @volatile private var stopCalled = false
test("mesos supports killing and limiting executors") {
setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
+ test("Do not call removeExecutor() after backend is stopped") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val offers = List((backend.executorMemory(sc), 1))
+ offerResources(offers)
+ verifyTaskLaunched(driver, "o1")
+
+ // launches a thread simulating status update
+ val statusUpdateThread = new Thread {
+ override def run(): Unit = {
+ while (!stopCalled) {
+ Thread.sleep(100)
+ }
+
+ val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+ backend.statusUpdate(driver, status)
+ }
+ }.start
+
+ backend.stop()
+ // Any method of the backend involving sending messages to the driver endpoint should not
+ // be called after the backend is stopped.
+ verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver
}
+ override def stopExecutors(): Unit = {
+ stopCalled = true
+ }
+
markRegistered()
}
backend.start()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org