You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/10 17:54:08 UTC

[spark] branch branch-2.4 updated: [SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 95c55f6  [SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
95c55f6 is described below

commit 95c55f6942d32947d10e79007968a4f0b26a162b
Author: Sam Tran <st...@mesosphere.com>
AuthorDate: Fri May 10 10:53:31 2019 -0700

    [SPARK-27347][MESOS] Fix supervised driver retry logic for outdated tasks
    
    ## What changes were proposed in this pull request?
    
    This patch fixes a bug where `--supervised` Spark jobs would retry multiple times whenever an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent.
    
    That is:
    ```
    - supervised driver is running on agent1
    - agent1 crashes
    - driver is relaunched on another agent as `<task-id>-retry-1`
    - agent1 comes back online and re-registers with scheduler
    - spark relaunches the same job as `<task-id>-retry-2`
    - now there are two jobs running simultaneously
    ```
    
    This is because when an agent would come back and re-register it would send a status update `TASK_FAILED` for its old driver-task. Previous logic would indiscriminately remove the `submissionId` from Zookeeper's `launchedDrivers` node and add it to `retryList` node. Then, when a new offer came in, it would relaunch another `-retry-`  task even though one was previously running.
    
    For example logs, scroll to bottom
    
    ## How was this patch tested?
    
    - Added a unit test to simulate behavior described above
    - Tested manually on a DC/OS cluster by
      ```
      - launching a --supervised spark job
      - dcos node ssh <to the agent with the running spark-driver>
      - systemctl stop dcos-mesos-slave
      - docker kill <driver-container-id>
      - [ wait until spark job is relaunched ]
      - systemctl start dcos-mesos-slave
      - [ observe spark driver is not relaunched as `-retry-2` ]
      ```
    
    Log snippets included below. Notice the `-retry-1` task is running when status update for the old task comes in afterward:
    ```
    19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos:
    ... [offers] ...
    19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
    ...
    19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
    19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
    ...
    19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED
    ...
    19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1"
    ...
    19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
    19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
    ...
    19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered'
    ...
    19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
    19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update
    ...
    19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2"
    ...
    19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
    19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''
    ```
    
    Closes #24276 from samvantran/SPARK-27347-duplicate-retries.
    
    Authored-by: Sam Tran <st...@mesosphere.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit bcd3b61c4be98565352491a108e6394670a0f413)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../cluster/mesos/MesosClusterScheduler.scala      | 14 ++++
 .../cluster/mesos/MesosClusterSchedulerSuite.scala | 80 ++++++++++++++++++++++
 2 files changed, 94 insertions(+)

diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8c9a6b4..7a23862 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -769,6 +769,10 @@ private[spark] class MesosClusterScheduler(
         val state = launchedDrivers(subId)
         // Check if the driver is supervise enabled and can be relaunched.
         if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
+          if (isTaskOutdated(taskId, state)) {
+            // Prevent outdated task from overwriting a more recent status
+            return
+          }
           removeFromLaunchedDrivers(subId)
           state.finishDate = Some(new Date())
           val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
@@ -789,6 +793,16 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
+  /**
+   * Check if the task is outdated i.e. has already been launched or is pending
+   * If neither, the taskId is outdated and should be ignored
+   * This is to avoid scenarios where an outdated status update arrives
+   * after a supervised driver has already been relaunched
+   */
+  private def isTaskOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
+    taskId != state.taskId.getValue &&
+      !pendingRetryDrivers.exists(_.submissionId == state.driverDescription.submissionId)
+
   private def retireDriver(
       submissionId: String,
       state: MesosClusterSubmissionState) = {
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 580c3a7..11c48b5 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -423,6 +423,86 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
     assert(state.finishedDrivers.size == 1)
   }
 
+  test("SPARK-27347: do not restart outdated supervised drivers") {
+    // Covers scenario where:
+    // - agent goes down
+    // - supervised job is relaunched on another agent
+    // - first agent re-registers and sends status update: TASK_FAILED
+    // - job should NOT be relaunched again
+    val conf = new SparkConf()
+    conf.setMaster("mesos://localhost:5050")
+    conf.setAppName("SparkMesosDriverRetries")
+    setScheduler(conf.getAll.toMap)
+
+    val mem = 1000
+    val cpu = 1
+    val offers = List(
+      Utils.createOffer("o1", "s1", mem, cpu, None),
+      Utils.createOffer("o2", "s2", mem, cpu, None),
+      Utils.createOffer("o3", "s1", mem, cpu, None))
+
+    val response = scheduler.submitDriver(
+      new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
+    assert(response.success)
+
+    // Offer a resource to launch the submitted driver
+    scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
+    var state = scheduler.getSchedulerState()
+    assert(state.launchedDrivers.size == 1)
+
+    // Signal agent lost with status with TASK_LOST
+    val agent1 = SlaveID.newBuilder().setValue("s1").build()
+    var taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(agent1)
+      .setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
+      .setState(MesosTaskState.TASK_LOST)
+      .build()
+
+    scheduler.statusUpdate(driver, taskStatus)
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.size == 1)
+    assert(state.pendingRetryDrivers.head.submissionId == taskStatus.getTaskId.getValue)
+    assert(state.launchedDrivers.isEmpty)
+
+    // Offer new resource to retry driver on a new agent
+    Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
+    scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))
+
+    val agent2 = SlaveID.newBuilder().setValue("s2").build()
+    taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(agent2)
+      .setState(MesosTaskState.TASK_RUNNING)
+      .build()
+
+    scheduler.statusUpdate(driver, taskStatus)
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.isEmpty)
+    assert(state.launchedDrivers.size == 1)
+    assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
+    assert(state.launchedDrivers.head.taskId.getValue != taskStatus.getTaskId.getValue)
+
+    // Agent1 comes back online and sends status update: TASK_FAILED
+    taskStatus = TaskStatus.newBuilder()
+      .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+      .setSlaveId(agent1)
+      .setState(MesosTaskState.TASK_FAILED)
+      .setMessage("Abnormal executor termination")
+      .setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
+      .build()
+
+    scheduler.statusUpdate(driver, taskStatus)
+    scheduler.resourceOffers(driver, Collections.singletonList(offers.last))
+
+    // Assert driver does not restart 2nd time
+    state = scheduler.getSchedulerState()
+    assert(state.pendingRetryDrivers.isEmpty)
+    assert(state.launchedDrivers.size == 1)
+    assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
+  }
+
   test("Declines offer with refuse seconds = 120.") {
     setScheduler()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org