You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/10 02:53:31 UTC

[1/2] git commit: replace the thread with a Akka scheduler

Updated Branches:
  refs/heads/master 83bf1920c -> 87954d4c8


replace the thread with a Akka scheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/28115fa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/28115fa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/28115fa8

Branch: refs/heads/master
Commit: 28115fa8cb942c907a90e48ee1171f2a9b698411
Parents: dd63c54
Author: soulmachine <so...@gmail.com>
Authored: Sat Nov 9 22:38:27 2013 +0800
Committer: soulmachine <so...@gmail.com>
Committed: Sat Nov 9 22:38:27 2013 +0800

----------------------------------------------------------------------
 .../scheduler/cluster/ClusterScheduler.scala    | 23 +++++++-------------
 1 file changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/28115fa8/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 8503395..53a5896 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,6 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 
+import akka.util.duration._
+
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._
@@ -119,21 +121,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
     backend.start()
 
     if (System.getProperty("spark.speculation", "false").toBoolean) {
-      new Thread("ClusterScheduler speculation check") {
-        setDaemon(true)
-
-        override def run() {
-          logInfo("Starting speculative execution thread")
-          while (true) {
-            try {
-              Thread.sleep(SPECULATION_INTERVAL)
-            } catch {
-              case e: InterruptedException => {}
-            }
-            checkSpeculatableTasks()
-          }
-        }
-      }.start()
+      logInfo("Starting speculative execution thread")
+
+      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
+            SPECULATION_INTERVAL milliseconds) {
+        checkSpeculatableTasks()
+      }
     }
   }
 


[2/2] git commit: Merge pull request #154 from soulmachine/ClusterScheduler

Posted by ma...@apache.org.
Merge pull request #154 from soulmachine/ClusterScheduler

Replace the thread inside ClusterScheduler.start() with an Akka scheduler

Threads are precious resources so that we shouldn't abuse them


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/87954d4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/87954d4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/87954d4c

Branch: refs/heads/master
Commit: 87954d4c85f6ff2c1392880a611792a35b5d8fae
Parents: 83bf192 28115fa
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sat Nov 9 17:53:25 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sat Nov 9 17:53:25 2013 -0800

----------------------------------------------------------------------
 .../scheduler/cluster/ClusterScheduler.scala    | 23 +++++++-------------
 1 file changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------