You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/19 01:55:49 UTC

spark git commit: [SPARK-7624] Revert #4147

Repository: spark
Updated Branches:
  refs/heads/master eb4632f28 -> 4fb52f954


[SPARK-7624] Revert #4147

Author: Davies Liu <da...@databricks.com>

Closes #6172 from davies/revert_4147 and squashes the following commits:

3bfbbde [Davies Liu] Revert #4147


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb52f95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb52f95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb52f95

Branch: refs/heads/master
Commit: 4fb52f9545ae338fae2d3aeea4bfc35d5df44853
Parents: eb4632f
Author: Davies Liu <da...@databricks.com>
Authored: Mon May 18 16:55:45 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon May 18 16:55:45 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/local/LocalBackend.scala    | 23 ++------------------
 1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4fb52f95/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e64d06c..3078a1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,12 @@
 package org.apache.spark.scheduler.local
 
 import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
 
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.{ThreadUtils, Utils}
 
 private case class ReviveOffers()
 
@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
     private val totalCores: Int)
   extends ThreadSafeRpcEndpoint with Logging {
 
-  private val reviveThread =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
-
   private var freeCores = totalCores
 
   private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
       context.reply(true)
   }
 
-
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
-    val tasks = scheduler.resourceOffers(offers).flatten
-    for (task <- tasks) {
+    for (task <- scheduler.resourceOffers(offers).flatten) {
       freeCores -= scheduler.CPUS_PER_TASK
       executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
         task.name, task.serializedTask)
     }
-    if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
-      // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
-      reviveThread.schedule(new Runnable {
-        override def run(): Unit = Utils.tryLogNonFatalError {
-          Option(self).foreach(_.send(ReviveOffers))
-        }
-      }, 1000, TimeUnit.MILLISECONDS)
-    }
-  }
-
-  override def onStop(): Unit = {
-    reviveThread.shutdownNow()
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org