You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/02/04 07:30:36 UTC
spark git commit: [SPARK-4939] revive offers periodically in
LocalBackend
Repository: spark
Updated Branches:
refs/heads/master 242b4f02d -> 83de71c45
[SPARK-4939] revive offers periodically in LocalBackend
The locality timeout assume that the SchedulerBackend can revive offers periodically, but currently LocalBackend did do that, then some job with mixed locality levels in local mode will hang forever.
This PR let LocalBackend revive offers periodically, just like in cluster mode.
Author: Davies Liu <da...@databricks.com>
Closes #4147 from davies/revive and squashes the following commits:
2acdf9d [Davies Liu] Update LocalBackend.scala
3c8ca7c [Davies Liu] Update LocalBackend.scala
d1b60d2 [Davies Liu] address comments from Kay
33ac9bb [Davies Liu] fix build
d0da0d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into revive
6cf5972 [Davies Liu] fix thread-safety
ed62a31 [Davies Liu] fix scala style
df9008b [Davies Liu] fix typo
bfc1396 [Davies Liu] revive offers periodically in LocalBackend
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83de71c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83de71c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83de71c4
Branch: refs/heads/master
Commit: 83de71c45bb9f22049243dd7518b679c4e13c2df
Parents: 242b4f0
Author: Davies Liu <da...@databricks.com>
Authored: Tue Feb 3 22:30:23 2015 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Tue Feb 3 22:30:23 2015 -0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/local/LocalBackend.scala | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/83de71c4/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 05b6fa5..4676b82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
+import scala.concurrent.duration._
+
import akka.actor.{Actor, ActorRef, Props}
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
@@ -46,6 +48,8 @@ private[spark] class LocalActor(
private val totalCores: Int)
extends Actor with ActorLogReceive with Logging {
+ import context.dispatcher // to use Akka's scheduler.scheduleOnce()
+
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -74,11 +78,16 @@ private[spark] class LocalActor(
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
- for (task <- scheduler.resourceOffers(offers).flatten) {
+ val tasks = scheduler.resourceOffers(offers).flatten
+ for (task <- tasks) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
+ if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
+ // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
+ context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org