You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/19 01:55:49 UTC
spark git commit: [SPARK-7624] Revert #4147
Repository: spark
Updated Branches:
refs/heads/master eb4632f28 -> 4fb52f954
[SPARK-7624] Revert #4147
Author: Davies Liu <da...@databricks.com>
Closes #6172 from davies/revert_4147 and squashes the following commits:
3bfbbde [Davies Liu] Revert #4147
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb52f95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb52f95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb52f95
Branch: refs/heads/master
Commit: 4fb52f9545ae338fae2d3aeea4bfc35d5df44853
Parents: eb4632f
Author: Davies Liu <da...@databricks.com>
Authored: Mon May 18 16:55:45 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon May 18 16:55:45 2015 -0700
----------------------------------------------------------------------
.../spark/scheduler/local/LocalBackend.scala | 23 ++------------------
1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4fb52f95/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index e64d06c..3078a1b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -18,14 +18,12 @@
package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
-import java.util.concurrent.TimeUnit
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.util.{ThreadUtils, Utils}
private case class ReviveOffers()
@@ -47,9 +45,6 @@ private[spark] class LocalEndpoint(
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
- private val reviveThread =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
-
private var freeCores = totalCores
private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
@@ -79,27 +74,13 @@ private[spark] class LocalEndpoint(
context.reply(true)
}
-
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
- val tasks = scheduler.resourceOffers(offers).flatten
- for (task <- tasks) {
+ for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
- if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
- // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
- reviveThread.schedule(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(ReviveOffers))
- }
- }, 1000, TimeUnit.MILLISECONDS)
- }
- }
-
- override def onStop(): Unit = {
- reviveThread.shutdownNow()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org