You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/02/04 07:30:36 UTC

spark git commit: [SPARK-4939] revive offers periodically in LocalBackend

Repository: spark
Updated Branches:
  refs/heads/master 242b4f02d -> 83de71c45


[SPARK-4939] revive offers periodically in LocalBackend

The locality timeout assume that the SchedulerBackend can revive offers periodically, but currently LocalBackend did do that, then some job with mixed locality levels in local mode will hang forever.

This PR let LocalBackend revive offers periodically, just like in cluster mode.

Author: Davies Liu <da...@databricks.com>

Closes #4147 from davies/revive and squashes the following commits:

2acdf9d [Davies Liu] Update LocalBackend.scala
3c8ca7c [Davies Liu] Update LocalBackend.scala
d1b60d2 [Davies Liu] address comments from Kay
33ac9bb [Davies Liu] fix build
d0da0d5 [Davies Liu] Merge branch 'master' of github.com:apache/spark into revive
6cf5972 [Davies Liu] fix thread-safety
ed62a31 [Davies Liu] fix scala style
df9008b [Davies Liu] fix typo
bfc1396 [Davies Liu] revive offers periodically in LocalBackend


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83de71c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83de71c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83de71c4

Branch: refs/heads/master
Commit: 83de71c45bb9f22049243dd7518b679c4e13c2df
Parents: 242b4f0
Author: Davies Liu <da...@databricks.com>
Authored: Tue Feb 3 22:30:23 2015 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Tue Feb 3 22:30:23 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/local/LocalBackend.scala  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/83de71c4/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 05b6fa5..4676b82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
 
+import scala.concurrent.duration._
+
 import akka.actor.{Actor, ActorRef, Props}
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
@@ -46,6 +48,8 @@ private[spark] class LocalActor(
     private val totalCores: Int)
   extends Actor with ActorLogReceive with Logging {
 
+  import context.dispatcher   // to use Akka's scheduler.scheduleOnce()
+
   private var freeCores = totalCores
 
   private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -74,11 +78,16 @@ private[spark] class LocalActor(
 
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
-    for (task <- scheduler.resourceOffers(offers).flatten) {
+    val tasks = scheduler.resourceOffers(offers).flatten
+    for (task <- tasks) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
         task.name, task.serializedTask)
     }
+    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
+      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
+      context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
+    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org