You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/10 17:53:51 UTC
[spark] branch master updated: [SPARK-27347][MESOS] Fix supervised
driver retry logic for outdated tasks
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bcd3b61 [SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
bcd3b61 is described below
commit bcd3b61c4be98565352491a108e6394670a0f413
Author: Sam Tran <st...@mesosphere.com>
AuthorDate: Fri May 10 10:53:31 2019 -0700
[SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
## What changes were proposed in this pull request?
This patch fixes a bug where `--supervised` Spark jobs would retry multiple times whenever an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent.
That is:
```
- supervised driver is running on agent1
- agent1 crashes
- driver is relaunched on another agent as `<task-id>-retry-1`
- agent1 comes back online and re-registers with scheduler
- spark relaunches the same job as `<task-id>-retry-2`
- now there are two jobs running simultaneously
```
This is because when an agent would come back and re-register it would send a status update `TASK_FAILED` for its old driver-task. Previous logic would indiscriminately remove the `submissionId` from Zookeeper's `launchedDrivers` node and add it to `retryList` node. Then, when a new offer came in, it would relaunch another `-retry-` task even though one was previously running.
For example logs, scroll to bottom
## How was this patch tested?
- Added a unit test to simulate behavior described above
- Tested manually on a DC/OS cluster by
```
- launching a --supervised spark job
- dcos node ssh <to the agent with the running spark-driver>
- systemctl stop dcos-mesos-slave
- docker kill <driver-container-id>
- [ wait until spark job is relaunched ]
- systemctl start dcos-mesos-slave
- [ observe spark driver is not relaunched as `-retry-2` ]
```
Log snippets included below. Notice the `-retry-1` task is running when status update for the old task comes in afterward:
```
19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos:
... [offers] ...
19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
...
19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
...
19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED
...
19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1"
...
19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered'
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update
...
19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2"
...
19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''
```
Closes #24276 from samvantran/SPARK-27347-duplicate-retries.
Authored-by: Sam Tran <st...@mesosphere.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../cluster/mesos/MesosClusterScheduler.scala | 14 ++++
.../cluster/mesos/MesosClusterSchedulerSuite.scala | 80 ++++++++++++++++++++++
2 files changed, 94 insertions(+)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 3ffccb0..8566a30 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -766,6 +766,10 @@ private[spark] class MesosClusterScheduler(
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
+ if (isTaskOutdated(taskId, state)) {
+ // Prevent outdated task from overwriting a more recent status
+ return
+ }
removeFromLaunchedDrivers(subId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
@@ -786,6 +790,16 @@ private[spark] class MesosClusterScheduler(
}
}
+ /**
+ * Check if the task is outdated i.e. has already been launched or is pending
+ * If neither, the taskId is outdated and should be ignored
+ * This is to avoid scenarios where an outdated status update arrives
+ * after a supervised driver has already been relaunched
+ */
+ private def isTaskOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
+ taskId != state.taskId.getValue &&
+ !pendingRetryDrivers.exists(_.submissionId == state.driverDescription.submissionId)
+
private def retireDriver(
submissionId: String,
state: MesosClusterSubmissionState) = {
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 536f5a2..924a991 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -450,6 +450,86 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.finishedDrivers.size == 1)
}
+ test("SPARK-27347: do not restart outdated supervised drivers") {
+ // Covers scenario where:
+ // - agent goes down
+ // - supervised job is relaunched on another agent
+ // - first agent re-registers and sends status update: TASK_FAILED
+ // - job should NOT be relaunched again
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("SparkMesosDriverRetries")
+ setScheduler(conf.getAll.toMap)
+
+ val mem = 1000
+ val cpu = 1
+ val offers = List(
+ Utils.createOffer("o1", "s1", mem, cpu, None),
+ Utils.createOffer("o2", "s2", mem, cpu, None),
+ Utils.createOffer("o3", "s1", mem, cpu, None))
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
+ assert(response.success)
+
+ // Offer a resource to launch the submitted driver
+ scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
+ var state = scheduler.getSchedulerState()
+ assert(state.launchedDrivers.size == 1)
+
+ // Signal agent lost with status with TASK_LOST
+ val agent1 = SlaveID.newBuilder().setValue("s1").build()
+ var taskStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+ .setSlaveId(agent1)
+ .setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
+ .setState(MesosTaskState.TASK_LOST)
+ .build()
+
+ scheduler.statusUpdate(driver, taskStatus)
+ state = scheduler.getSchedulerState()
+ assert(state.pendingRetryDrivers.size == 1)
+ assert(state.pendingRetryDrivers.head.submissionId == taskStatus.getTaskId.getValue)
+ assert(state.launchedDrivers.isEmpty)
+
+ // Offer new resource to retry driver on a new agent
+ Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
+ scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
+
+ val agent2 = SlaveID.newBuilder().setValue("s2").build()
+ taskStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+ .setSlaveId(agent2)
+ .setState(MesosTaskState.TASK_RUNNING)
+ .build()
+
+ scheduler.statusUpdate(driver, taskStatus)
+ state = scheduler.getSchedulerState()
+ assert(state.pendingRetryDrivers.isEmpty)
+ assert(state.launchedDrivers.size == 1)
+ assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
+ assert(state.launchedDrivers.head.taskId.getValue != taskStatus.getTaskId.getValue)
+
+ // Agent1 comes back online and sends status update: TASK_FAILED
+ taskStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+ .setSlaveId(agent1)
+ .setState(MesosTaskState.TASK_FAILED)
+ .setMessage("Abnormal executor termination")
+ .setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
+ .build()
+
+ scheduler.statusUpdate(driver, taskStatus)
+ scheduler.resourceOffers(driver, Collections.singletonList(offers.last))
+
+ // Assert driver does not restart 2nd time
+ state = scheduler.getSchedulerState()
+ assert(state.pendingRetryDrivers.isEmpty)
+ assert(state.launchedDrivers.size == 1)
+ assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
+ }
+
test("Declines offer with refuse seconds = 120.") {
setScheduler()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org