You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/23 03:31:59 UTC

spark git commit: [SPARK-19326] Speculated task attempts do not get launched in few scenarios

Repository: spark
Updated Branches:
  refs/heads/master 342961905 -> d58a3507e


[SPARK-19326] Speculated task attempts do not get launched in few scenarios

## What changes were proposed in this pull request?

Add a new listener event when a speculative task is created and notify it to ExecutorAllocationManager for requesting more executor.

## How was this patch tested?

- Added Unittests.
- For the test snippet in the jira:
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 2 running and 1 is speculative job)

Author: Jane Wang <ja...@fb.com>

Closes #18492 from janewangfb/speculated_task_not_launched.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d58a3507
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d58a3507
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d58a3507

Branch: refs/heads/master
Commit: d58a3507ed2d48eabb857c92aecead19a52f4952
Parents: 3429619
Author: Jane Wang <ja...@fb.com>
Authored: Wed Aug 23 11:31:54 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Aug 23 11:31:54 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/SparkFirehoseListener.java |  5 ++
 .../spark/ExecutorAllocationManager.scala       | 61 +++++++++++++++++---
 .../apache/spark/scheduler/DAGScheduler.scala   | 14 +++++
 .../spark/scheduler/DAGSchedulerEvent.scala     |  4 ++
 .../apache/spark/scheduler/SparkListener.scala  | 11 ++++
 .../spark/scheduler/SparkListenerBus.scala      |  2 +
 .../apache/spark/scheduler/TaskSetManager.scala |  1 +
 .../spark/ExecutorAllocationManagerSuite.scala  | 48 ++++++++++++++-
 .../spark/scheduler/TaskSetManagerSuite.scala   |  9 +++
 9 files changed, 144 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 140c52f..3583856 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -140,6 +140,11 @@ public class SparkFirehoseListener implements SparkListenerInterface {
   }
 
   @Override
+  public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) {
+    onEvent(speculativeTask);
+  }
+
+  @Override
   public void onOtherEvent(SparkListenerEvent event) {
     onEvent(event);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 337631a..3350326 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager(
     // If our target has not changed, do not send a message
     // to the cluster manager and reset our exponential growth
     if (delta == 0) {
-      numExecutorsToAdd = 1
-      return 0
+      // Check if there is any speculative jobs pending
+      if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
+        numExecutorsTarget =
+          math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
+      } else {
+        numExecutorsToAdd = 1
+        return 0
+      }
     }
 
     val addRequestAcknowledged = try {
@@ -588,17 +594,22 @@ private[spark] class ExecutorAllocationManager(
    * A listener that notifies the given allocation manager of when to add and remove executors.
    *
    * This class is intentionally conservative in its assumptions about the relative ordering
-   * and consistency of events returned by the listener. For simplicity, it does not account
-   * for speculated tasks.
+   * and consistency of events returned by the listener.
    */
   private class ExecutorAllocationListener extends SparkListener {
 
     private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
     private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
     private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
-    // Number of tasks currently running on the cluster.  Should be 0 when no stages are active.
+    // Number of tasks currently running on the cluster including speculative tasks.
+    // Should be 0 when no stages are active.
     private var numRunningTasks: Int = _
 
+    // Number of speculative tasks to be scheduled in each stage
+    private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
+    // The speculative tasks started in each stage
+    private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
+
     // stageId to tuple (the number of task with locality preferences, a map where each pair is a
     // node and the number of tasks that would like to be scheduled on that node) map,
     // maintain the executor placement hints for each stage Id used by resource framework to better
@@ -637,7 +648,9 @@ private[spark] class ExecutorAllocationManager(
       val stageId = stageCompleted.stageInfo.stageId
       allocationManager.synchronized {
         stageIdToNumTasks -= stageId
+        stageIdToNumSpeculativeTasks -= stageId
         stageIdToTaskIndices -= stageId
+        stageIdToSpeculativeTaskIndices -= stageId
         stageIdToExecutorPlacementHints -= stageId
 
         // Update the executor placement hints
@@ -645,7 +658,7 @@ private[spark] class ExecutorAllocationManager(
 
         // If this is the last stage with pending tasks, mark the scheduler queue as empty
         // This is needed in case the stage is aborted for any reason
-        if (stageIdToNumTasks.isEmpty) {
+        if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
           allocationManager.onSchedulerQueueEmpty()
           if (numRunningTasks != 0) {
             logWarning("No stages are running, but numRunningTasks != 0")
@@ -671,7 +684,12 @@ private[spark] class ExecutorAllocationManager(
         }
 
         // If this is the last pending task, mark the scheduler queue as empty
-        stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+        if (taskStart.taskInfo.speculative) {
+          stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
+            taskIndex
+        } else {
+          stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
+        }
         if (totalPendingTasks() == 0) {
           allocationManager.onSchedulerQueueEmpty()
         }
@@ -705,7 +723,11 @@ private[spark] class ExecutorAllocationManager(
           if (totalPendingTasks() == 0) {
             allocationManager.onSchedulerBacklogged()
           }
-          stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
+          if (taskEnd.taskInfo.speculative) {
+            stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+          } else {
+            stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
+          }
         }
       }
     }
@@ -726,18 +748,39 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.onExecutorRemoved(executorRemoved.executorId)
     }
 
+    override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
+      : Unit = {
+       val stageId = speculativeTask.stageId
+
+      allocationManager.synchronized {
+        stageIdToNumSpeculativeTasks(stageId) =
+          stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+        allocationManager.onSchedulerBacklogged()
+      }
+    }
+
     /**
      * An estimate of the total number of pending tasks remaining for currently running stages. Does
      * not account for tasks which may have failed and been resubmitted.
      *
      * Note: This is not thread-safe without the caller owning the `allocationManager` lock.
      */
-    def totalPendingTasks(): Int = {
+    def pendingTasks(): Int = {
       stageIdToNumTasks.map { case (stageId, numTasks) =>
         numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
       }.sum
     }
 
+    def pendingSpeculativeTasks(): Int = {
+      stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) =>
+        numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0)
+      }.sum
+    }
+
+    def totalPendingTasks(): Int = {
+      pendingTasks + pendingSpeculativeTasks
+    }
+
     /**
      * The number of tasks currently running across all stages.
      */

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 21bf9d0..562dd1d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -281,6 +281,13 @@ class DAGScheduler(
     eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))
   }
 
+  /**
+   * Called by the TaskSetManager when it decides a speculative task is needed.
+   */
+  def speculativeTaskSubmitted(task: Task[_]): Unit = {
+    eventProcessLoop.post(SpeculativeTaskSubmitted(task))
+  }
+
   private[scheduler]
   def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
     // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
@@ -812,6 +819,10 @@ class DAGScheduler(
     listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
   }
 
+  private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
+    listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
+  }
+
   private[scheduler] def handleTaskSetFailed(
       taskSet: TaskSet,
       reason: String,
@@ -1778,6 +1789,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)
 
+    case SpeculativeTaskSubmitted(task) =>
+      dagScheduler.handleSpeculativeTaskSubmitted(task)
+
     case GettingResultEvent(taskInfo) =>
       dagScheduler.handleGetTaskResult(taskInfo)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 3f8d563..54ab8f8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -94,3 +94,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr
   extends DAGSchedulerEvent
 
 private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
+private[scheduler]
+case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent
+

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 59f89a8..b76e560 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -53,6 +53,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T
 case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
 
 @DeveloperApi
+case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
+
+@DeveloperApi
 case class SparkListenerTaskEnd(
     stageId: Int,
     stageAttemptId: Int,
@@ -291,6 +294,11 @@ private[spark] trait SparkListenerInterface {
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
 
   /**
+   * Called when a speculative task is submitted
+   */
+  def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit
+
+  /**
    * Called when other events like SQL-specific events are posted.
    */
   def onOtherEvent(event: SparkListenerEvent): Unit
@@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface {
 
   override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
 
+  override def onSpeculativeTaskSubmitted(
+      speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
+
   override def onOtherEvent(event: SparkListenerEvent): Unit = { }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 3b0d3b1..056c0cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -71,6 +71,8 @@ private[spark] trait SparkListenerBus
         listener.onNodeUnblacklisted(nodeUnblacklisted)
       case blockUpdated: SparkListenerBlockUpdated =>
         listener.onBlockUpdated(blockUpdated)
+      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
+        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
       case _ => listener.onOtherEvent(event)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c2f8178..3804ea8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -966,6 +966,7 @@ private[spark] class TaskSetManager(
             "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
               .format(index, taskSet.id, info.host, threshold))
           speculatableTasks += index
+          sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
           foundTasks = true
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index b9ce71a..7da4bae 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors when speculative tasks added") {
+    sc = createSparkContext(0, 10, 0)
+    val manager = sc.executorAllocationManager.get
+
+    // Verify that we're capped at number of tasks including the speculative ones in the stage
+    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+    assert(numExecutorsTarget(manager) === 0)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(addExecutors(manager) === 1)
+    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1))
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 2)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsTarget(manager) === 3)
+    assert(numExecutorsToAdd(manager) === 4)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsTarget(manager) === 5)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Verify that running a task doesn't affect the target
+    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
+    assert(numExecutorsTarget(manager) === 5)
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Verify that running a speculative task doesn't affect the target
+    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true)))
+    assert(numExecutorsTarget(manager) === 5)
+    assert(addExecutors(manager) === 0)
+    assert(numExecutorsToAdd(manager) === 1)
+  }
+
   test("cancel pending executors when no longer needed") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
@@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
       taskLocalityPreferences = taskLocalityPreferences)
   }
 
-  private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
-    new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false)
+  private def createTaskInfo(
+      taskId: Int,
+      taskIndex: Int,
+      executorId: String,
+      speculative: Boolean = false): TaskInfo = {
+    new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative)
   }
 
+
   /* ------------------------------------------------------- *
    | Helper methods for accessing private methods and fields |
    * ------------------------------------------------------- */
@@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
   private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
   private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
+  private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
 
   private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _numExecutorsToAdd()
@@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _onExecutorBusy(id)
   }
 
+  private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = {
+    manager invokePrivate _onSpeculativeTaskSubmitted(id)
+  }
+
   private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _localityAwareTasks()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d58a3507/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 6f1663b..ae43f4c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -60,6 +60,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
       exception: Option[Throwable]): Unit = {
     taskScheduler.taskSetsFailed += taskSet.id
   }
+
+  override def speculativeTaskSubmitted(task: Task[_]): Unit = {
+    taskScheduler.speculativeTasks += task.partitionId
+  }
 }
 
 // Get the rack for a given host
@@ -92,6 +96,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
   val endedTasks = new mutable.HashMap[Long, TaskEndReason]
   val finishedManagers = new ArrayBuffer[TaskSetManager]
   val taskSetsFailed = new ArrayBuffer[String]
+  val speculativeTasks = new ArrayBuffer[Int]
 
   val executors = new mutable.HashMap[String, String]
   for ((execId, host) <- liveExecutors) {
@@ -139,6 +144,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
     }
   }
 
+
   override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
 }
 
@@ -929,6 +935,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // > 0ms, so advance the clock by 1ms here.
     clock.advance(1)
     assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(3))
+
     // Offer resource to start the speculative attempt for the running task
     val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
     assert(taskOption5.isDefined)
@@ -1016,6 +1024,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // > 0ms, so advance the clock by 1ms here.
     clock.advance(1)
     assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(3, 4))
     // Offer resource to start the speculative attempt for the running task
     val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
     assert(taskOption5.isDefined)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org