You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/14 01:49:59 UTC

[1/5] git commit: Replaced the daemon thread started by DAGScheduler with an actor

Updated Branches:
  refs/heads/master 9290e5bcd -> 2054c61a1


Replaced the daemon thread started by DAGScheduler with an actor


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2539c067
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2539c067
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2539c067

Branch: refs/heads/master
Commit: 2539c0674501432fb62073577db6da52a26db850
Parents: bf4e613
Author: Lian, Cheng <rh...@gmail.com>
Authored: Sat Nov 9 19:05:18 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Sat Nov 9 19:05:18 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   1 -
 .../apache/spark/scheduler/DAGScheduler.scala   | 105 ++++++++-----------
 .../org/apache/spark/storage/BlockManager.scala |   2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 4 files changed, 45 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 880b49e..03ffcc6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -238,7 +238,6 @@ class SparkContext(
   taskScheduler.start()
 
   @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
-  dagScheduler.start()
 
   ui.start()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d0b21e8..cb19969 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.util.Properties
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
+import akka.actor.{Props, Actor, ActorRef}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 
 import org.apache.spark._
@@ -65,12 +65,12 @@ class DAGScheduler(
 
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
-    eventQueue.put(BeginEvent(task, taskInfo))
+    eventProcessActor ! BeginEvent(task, taskInfo)
   }
 
   // Called to report that a task has completed and results are being fetched remotely.
   def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventQueue.put(GettingResultEvent(task, taskInfo))
+    eventProcessActor ! GettingResultEvent(task, taskInfo)
   }
 
   // Called by TaskScheduler to report task completions or failures.
@@ -81,23 +81,23 @@ class DAGScheduler(
       accumUpdates: Map[Long, Any],
       taskInfo: TaskInfo,
       taskMetrics: TaskMetrics) {
-    eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
+    eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
   }
 
   // Called by TaskScheduler when an executor fails.
   def executorLost(execId: String) {
-    eventQueue.put(ExecutorLost(execId))
+    eventProcessActor ! ExecutorLost(execId)
   }
 
   // Called by TaskScheduler when a host is added
   def executorGained(execId: String, host: String) {
-    eventQueue.put(ExecutorGained(execId, host))
+    eventProcessActor ! ExecutorGained(execId, host)
   }
 
   // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
   // cancellation of the job itself.
   def taskSetFailed(taskSet: TaskSet, reason: String) {
-    eventQueue.put(TaskSetFailed(taskSet, reason))
+    eventProcessActor ! TaskSetFailed(taskSet, reason)
   }
 
   // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
@@ -109,7 +109,36 @@ class DAGScheduler(
   // resubmit failed stages
   val POLL_TIMEOUT = 10L
 
-  private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
+  private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+    /**
+     * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
+     * events and responds by launching tasks. This runs in a dedicated thread and receives events
+     * via the eventQueue.
+     */
+    def receive = {
+      case event: DAGSchedulerEvent =>
+        if (event != null) {
+          logDebug("Got event of type " + event.getClass.getName)
+        }
+
+        if (!processEvent(event)) {
+          val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
+          // Periodically resubmit failed stages if some map output fetches have failed and we have
+          // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
+          // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
+          // the same time, so we want to make sure we've identified all the reduce tasks that depend
+          // on the failed node.
+          if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
+            resubmitFailedStages()
+          } else {
+            submitWaitingStages()
+          }
+        }
+        else {
+          context.stop(self)
+        }
+    }
+  }))
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
 
@@ -150,16 +179,6 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
 
-  // Start a thread to run the DAGScheduler event loop
-  def start() {
-    new Thread("DAGScheduler") {
-      setDaemon(true)
-      override def run() {
-        DAGScheduler.this.run()
-      }
-    }.start()
-  }
-
   def addSparkListener(listener: SparkListener) {
     listenerBus.addListener(listener)
   }
@@ -301,8 +320,7 @@ class DAGScheduler(
     assert(partitions.size > 0)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
-    eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite,
-      waiter, properties))
+    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
     waiter
   }
 
@@ -337,8 +355,7 @@ class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val partitions = (0 until rdd.partitions.size).toArray
     val jobId = nextJobId.getAndIncrement()
-    eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite,
-      listener, properties))
+    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -347,19 +364,19 @@ class DAGScheduler(
    */
   def cancelJob(jobId: Int) {
     logInfo("Asked to cancel job " + jobId)
-    eventQueue.put(JobCancelled(jobId))
+    eventProcessActor ! JobCancelled(jobId)
   }
 
   def cancelJobGroup(groupId: String) {
     logInfo("Asked to cancel job group " + groupId)
-    eventQueue.put(JobGroupCancelled(groupId))
+    eventProcessActor ! JobGroupCancelled(groupId)
   }
 
   /**
    * Cancel all jobs that are running or waiting in the queue.
    */
   def cancelAllJobs() {
-    eventQueue.put(AllJobsCancelled)
+    eventProcessActor ! AllJobsCancelled
   }
 
   /**
@@ -474,42 +491,6 @@ class DAGScheduler(
     }
   }
 
-
-  /**
-   * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
-   * events and responds by launching tasks. This runs in a dedicated thread and receives events
-   * via the eventQueue.
-   */
-  private def run() {
-    SparkEnv.set(env)
-
-    while (true) {
-      val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
-      if (event != null) {
-        logDebug("Got event of type " + event.getClass.getName)
-      }
-      this.synchronized { // needed in case other threads makes calls into methods of this class
-        if (event != null) {
-          if (processEvent(event)) {
-            return
-          }
-        }
-
-        val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
-        // Periodically resubmit failed stages if some map output fetches have failed and we have
-        // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
-        // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
-        // the same time, so we want to make sure we've identified all the reduce tasks that depend
-        // on the failed node.
-        if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
-          resubmitFailedStages()
-        } else {
-          submitWaitingStages()
-        }
-      }
-    }
-  }
-
   /**
    * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
    * We run the operation in a separate thread just in case it takes a bunch of time, so that we
@@ -909,7 +890,7 @@ class DAGScheduler(
   }
 
   def stop() {
-    eventQueue.put(StopDAGScheduler)
+    eventProcessActor ! StopDAGScheduler
     metadataCleaner.cancel()
     taskSched.stop()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a34c95b..2c21134 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -893,7 +893,7 @@ private[spark] object BlockManager extends Logging {
   {
     // env == null and blockManagerMaster != null is used in tests
     assert (env != null || blockManagerMaster != null)
-    val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
+    val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
       env.blockManager.getLocationBlockIds(blockIds)
     } else {
       blockManagerMaster.getLocations(blockIds)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2539c067/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 00f2fdd..a4d41eb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -100,7 +100,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     cacheLocations.clear()
     results.clear()
     mapOutputTracker = new MapOutputTrackerMaster()
-    scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) {
+    scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
       override def runLocally(job: ActiveJob) {
         // don't bother with the thread while unit testing
         runLocallyWithinThread(job)


[3/5] git commit: Put the periodical resubmitFailedStages() call into a scheduled task

Posted by ma...@apache.org.
Put the periodical resubmitFailedStages() call into a scheduled task


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba552851
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba552851
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba552851

Branch: refs/heads/master
Commit: ba552851771cf8eaf90b72b661c3df60080d0ef9
Parents: 765ebca
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Nov 11 01:25:35 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Nov 11 01:25:35 2013 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 28 +++++++++-----------
 1 file changed, 12 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba552851/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a73a6e1..7499570 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,8 @@ import java.io.NotSerializableException
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
-import akka.actor.{Props, Actor, ActorRef}
+import akka.actor._
+import akka.util.duration._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 
 import org.apache.spark._
@@ -110,6 +111,13 @@ class DAGScheduler(
   val POLL_TIMEOUT = 10L
 
   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+    override def preStart() {
+      env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+        if (failed.size > 0)
+          resubmitFailedStages()
+      }
+    }
+
     /**
      * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
      * events and responds by launching tasks. This runs in a dedicated thread and receives events
@@ -119,22 +127,10 @@ class DAGScheduler(
       case event: DAGSchedulerEvent =>
         logDebug("Got event of type " + event.getClass.getName)
 
-        if (!processEvent(event)) {
-          val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
-          // Periodically resubmit failed stages if some map output fetches have failed and we have
-          // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
-          // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
-          // the same time, so we want to make sure we've identified all the reduce tasks that depend
-          // on the failed node.
-          if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
-            resubmitFailedStages()
-          } else {
-            submitWaitingStages()
-          }
-        }
-        else {
+        if (!processEvent(event))
+          submitWaitingStages()
+        else
           context.stop(self)
-        }
     }
   }))
 


[4/5] git commit: Made some changes according to suggestions from @aarondav

Posted by ma...@apache.org.
Made some changes according to suggestions from @aarondav


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e2a43b3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e2a43b3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e2a43b3d

Branch: refs/heads/master
Commit: e2a43b3dcce81fc99098510d09095e1be4bf3e29
Parents: ba55285
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Nov 11 12:21:54 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Nov 11 12:21:54 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala     | 9 +++++----
 .../main/scala/org/apache/spark/storage/BlockManager.scala  | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2a43b3d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7499570..42bb388 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -112,9 +112,10 @@ class DAGScheduler(
 
   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
     override def preStart() {
-      env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
-        if (failed.size > 0)
+      context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+        if (failed.size > 0) {
           resubmitFailedStages()
+        }
       }
     }
 
@@ -853,7 +854,7 @@ class DAGScheduler(
     // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
     // that has any placement preferences. Ideally we would choose based on transfer sizes,
     // but this will do for now.
-    rdd.dependencies.foreach(_ match {
+    rdd.dependencies.foreach {
       case n: NarrowDependency[_] =>
         for (inPart <- n.getParents(partition)) {
           val locs = getPreferredLocs(n.rdd, inPart)
@@ -861,7 +862,7 @@ class DAGScheduler(
             return locs
         }
       case _ =>
-    })
+    }
     Nil
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e2a43b3d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 2c21134..702aca8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -891,7 +891,7 @@ private[spark] object BlockManager extends Logging {
       blockManagerMaster: BlockManagerMaster = null)
   : Map[BlockId, Seq[BlockManagerId]] =
   {
-    // env == null and blockManagerMaster != null is used in tests
+    // blockManagerMaster != null is used in tests
     assert (env != null || blockManagerMaster != null)
     val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
       env.blockManager.getLocationBlockIds(blockIds)


[2/5] git commit: Remove unnecessary null checking

Posted by ma...@apache.org.
Remove unnecessary null checking


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/765ebca0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/765ebca0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/765ebca0

Branch: refs/heads/master
Commit: 765ebca04f3dce1685c64022425bd281993be90e
Parents: 2539c06
Author: Lian, Cheng <rh...@gmail.com>
Authored: Sat Nov 9 21:13:03 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Sat Nov 9 21:13:03 2013 +0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/765ebca0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index cb19969..a73a6e1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -117,9 +117,7 @@ class DAGScheduler(
      */
     def receive = {
       case event: DAGSchedulerEvent =>
-        if (event != null) {
-          logDebug("Got event of type " + event.getClass.getName)
-        }
+        logDebug("Got event of type " + event.getClass.getName)
 
         if (!processEvent(event)) {
           val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability


[5/5] git commit: Merge pull request #159 from liancheng/dagscheduler-actor-refine

Posted by ma...@apache.org.
Merge pull request #159 from liancheng/dagscheduler-actor-refine

Migrate the daemon thread started by DAGScheduler to Akka actor

`DAGScheduler` adopts an event queue and a daemon thread polling the it to process events sent to a `DAGScheduler`.  This is a classical actor use case.  By migrating this thread to Akka actor, we may benefit from both cleaner code and better performance (context switching cost of Akka actor is much less than that of a native thread).

But things become a little complicated when taking existing test code into consideration.

Code in `DAGSchedulerSuite` is somewhat tightly coupled with `DAGScheduler`, and directly calls `DAGScheduler.processEvent` instead of posting event messages to `DAGScheduler`.  To minimize code change, I chose to let the actor to delegate messages to `processEvent`.  Maybe this doesn't follow conventional actor usage, but I tried to make it apparently correct.

Another tricky part is that, since `DAGScheduler` depends on the `ActorSystem` provided by its field `env`, `env` cannot be null.  But the `dagScheduler` field created in `DAGSchedulerSuite.before` was given a null `env`.  What's more, `BlockManager.blockIdsToBlockManagers` checks whether `env` is null to determine whether to run the production code or the test code (bad smell here, huh?).  I went through all callers of `BlockManager.blockIdsToBlockManagers`, and made sure that if `env != null` holds, then `blockManagerMaster == null` must also hold.  That's the logic behind `BlockManager.scala` [line 896](https://github.com/liancheng/incubator-spark/compare/dagscheduler-actor-refine?expand=1#diff-2b643ea78c1add0381754b1f47eec132L896).

At last, since `DAGScheduler` instances are always `start()`ed after creation, I removed the `start()` method, and starts the `eventProcessActor` within the constructor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2054c61a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2054c61a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2054c61a

Branch: refs/heads/master
Commit: 2054c61a18c277c00661b89bbae365470c297031
Parents: 9290e5b e2a43b3
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Wed Nov 13 16:49:55 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Wed Nov 13 16:49:55 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   1 -
 .../apache/spark/scheduler/DAGScheduler.scala   | 104 +++++++------------
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 4 files changed, 43 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2054c61a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------