You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:57 UTC

[13/50] [abbrv] git commit: Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10

Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/client/Client.scala
	core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/199e9cf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/199e9cf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/199e9cf0

Branch: refs/heads/master
Commit: 199e9cf02dfaa372c1f067bca54556e1f6ce787d
Parents: 6860b79 f6b2e59
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Thu Nov 21 11:55:48 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Thu Nov 21 11:55:48 2013 +0530

----------------------------------------------------------------------
 README.md                                       |    2 +-
 bin/compute-classpath.sh                        |   22 +-
 bin/slaves.sh                                   |   19 +-
 bin/spark-daemon.sh                             |   21 +-
 bin/spark-daemons.sh                            |    2 +-
 bin/stop-slaves.sh                              |    2 -
 core/pom.xml                                    |    4 +
 .../spark/network/netty/FileClientHandler.java  |    3 +-
 .../spark/network/netty/FileServerHandler.java  |   23 +-
 .../spark/network/netty/PathResolver.java       |   11 +-
 .../hadoop/mapred/SparkHadoopMapRedUtil.scala   |   17 +-
 .../mapreduce/SparkHadoopMapReduceUtil.scala    |   33 +-
 .../scala/org/apache/spark/Aggregator.scala     |   49 +-
 .../apache/spark/BlockStoreShuffleFetcher.scala |   23 +-
 .../scala/org/apache/spark/CacheManager.scala   |   12 +-
 .../scala/org/apache/spark/FutureAction.scala   |  250 +++++
 .../apache/spark/InterruptibleIterator.scala    |   30 +
 .../org/apache/spark/MapOutputTracker.scala     |  169 +--
 .../scala/org/apache/spark/ShuffleFetcher.scala |    5 +-
 .../scala/org/apache/spark/SparkContext.scala   |  234 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   25 +-
 .../org/apache/spark/SparkHadoopWriter.scala    |   21 +-
 .../scala/org/apache/spark/TaskContext.scala    |   21 +-
 .../scala/org/apache/spark/TaskEndReason.scala  |    2 +
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   24 +
 .../org/apache/spark/api/java/JavaPairRDD.scala |   35 +
 .../org/apache/spark/api/java/JavaRDD.scala     |   19 +
 .../java/function/DoubleFlatMapFunction.java    |   10 +-
 .../spark/api/java/function/DoubleFunction.java |    3 +-
 .../api/java/function/FlatMapFunction.scala     |    3 -
 .../api/java/function/FlatMapFunction2.scala    |    3 -
 .../spark/api/java/function/Function.java       |    4 +-
 .../spark/api/java/function/Function2.java      |    2 -
 .../spark/api/java/function/Function3.java      |   36 +
 .../api/java/function/PairFlatMapFunction.java  |    2 -
 .../spark/api/java/function/PairFunction.java   |    5 +-
 .../api/java/function/WrappedFunction3.scala    |   34 +
 .../org/apache/spark/api/python/PythonRDD.scala |    2 +-
 .../spark/broadcast/BitTorrentBroadcast.scala   | 1058 ------------------
 .../apache/spark/broadcast/HttpBroadcast.scala  |   13 +-
 .../apache/spark/broadcast/MultiTracker.scala   |  410 -------
 .../org/apache/spark/broadcast/SourceInfo.scala |   54 -
 .../spark/broadcast/TorrentBroadcast.scala      |  247 ++++
 .../apache/spark/broadcast/TreeBroadcast.scala  |  603 ----------
 .../org/apache/spark/deploy/DeployMessage.scala |   29 +-
 .../spark/deploy/ExecutorDescription.scala      |   34 +
 .../spark/deploy/FaultToleranceTest.scala       |  420 +++++++
 .../org/apache/spark/deploy/JsonProtocol.scala  |    3 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |    7 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |   60 +-
 .../org/apache/spark/deploy/client/Client.scala |   74 +-
 .../spark/deploy/client/ClientListener.scala    |    4 +
 .../apache/spark/deploy/client/TestClient.scala |    7 +-
 .../spark/deploy/master/ApplicationInfo.scala   |   53 +-
 .../spark/deploy/master/ApplicationState.scala  |    5 +-
 .../spark/deploy/master/ExecutorInfo.scala      |    7 +-
 .../master/FileSystemPersistenceEngine.scala    |   90 ++
 .../deploy/master/LeaderElectionAgent.scala     |   45 +
 .../org/apache/spark/deploy/master/Master.scala |  256 ++++-
 .../spark/deploy/master/MasterMessages.scala    |   46 +
 .../spark/deploy/master/PersistenceEngine.scala |   53 +
 .../spark/deploy/master/RecoveryState.scala     |   26 +
 .../deploy/master/SparkZooKeeperSession.scala   |  203 ++++
 .../apache/spark/deploy/master/WorkerInfo.scala |   42 +-
 .../spark/deploy/master/WorkerState.scala       |    2 +-
 .../master/ZooKeeperLeaderElectionAgent.scala   |  136 +++
 .../master/ZooKeeperPersistenceEngine.scala     |   85 ++
 .../spark/deploy/worker/ExecutorRunner.scala    |   15 +-
 .../org/apache/spark/deploy/worker/Worker.scala |  166 ++-
 .../spark/deploy/worker/WorkerArguments.scala   |    8 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |    2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  126 +++
 .../org/apache/spark/executor/Executor.scala    |  173 ++-
 .../spark/executor/MesosExecutorBackend.scala   |   18 +-
 .../executor/StandaloneExecutorBackend.scala    |  119 --
 .../org/apache/spark/executor/TaskMetrics.scala |    5 +
 .../spark/network/ConnectionManager.scala       |    3 +-
 .../apache/spark/network/netty/FileHeader.scala |   22 +-
 .../spark/network/netty/ShuffleCopier.scala     |   27 +-
 .../spark/network/netty/ShuffleSender.scala     |    9 +-
 .../main/scala/org/apache/spark/package.scala   |    2 +
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |  123 ++
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |    9 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |    9 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   26 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  125 +--
 .../spark/rdd/MapPartitionsWithContextRDD.scala |   42 +
 .../spark/rdd/MapPartitionsWithIndexRDD.scala   |   42 -
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   79 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   16 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |    5 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  106 +-
 .../org/apache/spark/rdd/ShuffledRDD.scala      |    2 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |    2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  292 ++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |   29 +-
 .../spark/scheduler/DAGSchedulerSource.scala    |    2 +-
 .../spark/scheduler/InputFormatInfo.scala       |    7 +-
 .../org/apache/spark/scheduler/JobLogger.scala  |  676 ++++++-----
 .../org/apache/spark/scheduler/JobWaiter.scala  |   62 +-
 .../scala/org/apache/spark/scheduler/Pool.scala |    5 +-
 .../org/apache/spark/scheduler/ResultTask.scala |   48 +-
 .../spark/scheduler/SchedulableBuilder.scala    |    3 +
 .../apache/spark/scheduler/ShuffleMapTask.scala |   76 +-
 .../apache/spark/scheduler/SparkListener.scala  |   21 +-
 .../spark/scheduler/SparkListenerBus.scala      |    2 +
 .../org/apache/spark/scheduler/Stage.scala      |    6 +-
 .../org/apache/spark/scheduler/StageInfo.scala  |   13 +-
 .../scala/org/apache/spark/scheduler/Task.scala |   63 +-
 .../org/apache/spark/scheduler/TaskInfo.scala   |   20 +
 .../org/apache/spark/scheduler/TaskResult.scala |    3 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   10 +-
 .../spark/scheduler/TaskSchedulerListener.scala |   44 -
 .../org/apache/spark/scheduler/TaskSet.scala    |    4 +
 .../scheduler/cluster/ClusterScheduler.scala    |   79 +-
 .../cluster/ClusterTaskSetManager.scala         |  106 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   69 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala |  233 ++++
 .../scheduler/cluster/SchedulerBackend.scala    |    6 +-
 .../cluster/SimrSchedulerBackend.scala          |   78 ++
 .../cluster/SparkDeploySchedulerBackend.scala   |   20 +-
 .../cluster/StandaloneClusterMessage.scala      |   63 --
 .../cluster/StandaloneSchedulerBackend.scala    |  195 ----
 .../scheduler/cluster/TaskResultGetter.scala    |   26 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |   16 +-
 .../spark/scheduler/local/LocalScheduler.scala  |  196 ++--
 .../scheduler/local/LocalTaskSetManager.scala   |   24 +-
 .../spark/serializer/KryoSerializer.scala       |   52 +-
 .../apache/spark/storage/BlockException.scala   |    2 +-
 .../spark/storage/BlockFetcherIterator.scala    |   24 +-
 .../org/apache/spark/storage/BlockId.scala      |  103 ++
 .../org/apache/spark/storage/BlockInfo.scala    |   81 ++
 .../org/apache/spark/storage/BlockManager.scala |  632 ++++-------
 .../spark/storage/BlockManagerMaster.scala      |    8 +-
 .../spark/storage/BlockManagerMasterActor.scala |   25 +-
 .../spark/storage/BlockManagerMessages.scala    |   16 +-
 .../spark/storage/BlockManagerSlaveActor.scala  |    1 +
 .../spark/storage/BlockManagerWorker.scala      |    4 +-
 .../org/apache/spark/storage/BlockMessage.scala |   38 +-
 .../spark/storage/BlockMessageArray.scala       |    7 +-
 .../spark/storage/BlockObjectWriter.scala       |  142 ++-
 .../org/apache/spark/storage/BlockStore.scala   |   14 +-
 .../apache/spark/storage/DiskBlockManager.scala |  151 +++
 .../org/apache/spark/storage/DiskStore.scala    |  280 +----
 .../org/apache/spark/storage/FileSegment.scala  |   28 +
 .../org/apache/spark/storage/MemoryStore.scala  |   34 +-
 .../spark/storage/ShuffleBlockManager.scala     |  200 +++-
 .../spark/storage/StoragePerfTester.scala       |   86 ++
 .../org/apache/spark/storage/StorageUtils.scala |   47 +-
 .../apache/spark/storage/ThreadingTest.scala    |    6 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |    2 +-
 .../org/apache/spark/ui/jobs/IndexPage.scala    |    2 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  105 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |    8 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   48 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   33 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |   23 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   |  230 ++++
 .../org/apache/spark/util/MetadataCleaner.scala |   37 +-
 .../scala/org/apache/spark/util/Utils.scala     |   48 +-
 .../apache/spark/util/collection/BitSet.scala   |  103 ++
 .../spark/util/collection/OpenHashMap.scala     |  153 +++
 .../spark/util/collection/OpenHashSet.scala     |  272 +++++
 .../collection/PrimitiveKeyOpenHashMap.scala    |  128 +++
 .../spark/util/collection/PrimitiveVector.scala |   53 +
 .../scala/org/apache/spark/BroadcastSuite.scala |   52 +-
 .../org/apache/spark/CacheManagerSuite.scala    |   21 +-
 .../org/apache/spark/CheckpointSuite.scala      |   10 +-
 .../org/apache/spark/DistributedSuite.scala     |   16 +-
 .../org/apache/spark/FileServerSuite.scala      |   16 +
 .../scala/org/apache/spark/JavaAPISuite.java    |   23 +-
 .../org/apache/spark/JobCancellationSuite.scala |  209 ++++
 .../apache/spark/MapOutputTrackerSuite.scala    |   20 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |    7 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   19 +
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |  176 +++
 .../spark/rdd/PairRDDFunctionsSuite.scala       |    2 +-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |   20 +
 .../spark/scheduler/DAGSchedulerSuite.scala     |   37 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |   17 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  136 ++-
 .../cluster/ClusterTaskSetManagerSuite.scala    |   49 +-
 .../spark/scheduler/cluster/FakeTask.scala      |    5 +-
 .../cluster/TaskResultGetterSuite.scala         |    3 +-
 .../scheduler/local/LocalSchedulerSuite.scala   |   28 +-
 .../org/apache/spark/storage/BlockIdSuite.scala |  114 ++
 .../spark/storage/BlockManagerSuite.scala       |  102 +-
 .../spark/storage/DiskBlockManagerSuite.scala   |   84 ++
 .../apache/spark/util/AppendOnlyMapSuite.scala  |  154 +++
 .../spark/util/collection/BitSetSuite.scala     |   73 ++
 .../util/collection/OpenHashMapSuite.scala      |  148 +++
 .../util/collection/OpenHashSetSuite.scala      |  145 +++
 .../PrimitiveKeyOpenHashSetSuite.scala          |   90 ++
 docker/README.md                                |    5 +
 docker/build                                    |   22 +
 docker/spark-test/README.md                     |   11 +
 docker/spark-test/base/Dockerfile               |   38 +
 docker/spark-test/build                         |   22 +
 docker/spark-test/master/Dockerfile             |   21 +
 docker/spark-test/master/default_cmd            |   22 +
 docker/spark-test/worker/Dockerfile             |   22 +
 docker/spark-test/worker/default_cmd            |   22 +
 docs/cluster-overview.md                        |   14 +-
 docs/configuration.md                           |   10 +-
 docs/ec2-scripts.md                             |    2 +-
 docs/hadoop-third-party-distributions.md        |    4 +-
 docs/python-programming-guide.md                |   11 +
 docs/running-on-yarn.md                         |   10 +-
 docs/scala-programming-guide.md                 |    6 +-
 docs/spark-standalone.md                        |   75 ++
 docs/streaming-programming-guide.md             |    9 +-
 docs/tuning.md                                  |    2 +-
 ec2/spark_ec2.py                                |   68 +-
 examples/pom.xml                                |   36 +-
 .../streaming/examples/JavaKafkaWordCount.java  |   98 ++
 .../apache/spark/examples/BroadcastTest.scala   |   15 +-
 .../org/apache/spark/examples/SparkHdfsLR.scala |    3 +-
 .../org/apache/spark/examples/SparkKMeans.scala |    2 -
 .../org/apache/spark/examples/SparkPi.scala     |    2 +-
 .../streaming/examples/KafkaWordCount.scala     |   28 +-
 .../streaming/examples/MQTTWordCount.scala      |  107 ++
 .../clickstream/PageViewGenerator.scala         |   13 +-
 pom.xml                                         |  132 ++-
 project/SparkBuild.scala                        |   35 +-
 project/plugins.sbt                             |    2 +-
 python/pyspark/accumulators.py                  |   13 +-
 python/pyspark/context.py                       |   50 +-
 .../org/apache/spark/repl/SparkILoop.scala      |   22 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala |   34 +-
 spark-class                                     |   28 +-
 spark-class2.cmd                                |    7 +
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar     |  Bin 1358063 -> 0 bytes
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 |    1 -
 .../0.7.2-spark/kafka-0.7.2-spark.jar.sha1      |    1 -
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom     |    9 -
 .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 |    1 -
 .../0.7.2-spark/kafka-0.7.2-spark.pom.sha1      |    1 -
 .../apache/kafka/kafka/maven-metadata-local.xml |   12 -
 .../kafka/kafka/maven-metadata-local.xml.md5    |    1 -
 .../kafka/kafka/maven-metadata-local.xml.sha1   |    1 -
 streaming/pom.xml                               |   57 +-
 .../org/apache/spark/streaming/Checkpoint.scala |    2 +
 .../org/apache/spark/streaming/DStream.scala    |   55 +-
 .../spark/streaming/NetworkInputTracker.scala   |   14 +-
 .../spark/streaming/PairDStreamFunctions.scala  |  154 ++-
 .../spark/streaming/StreamingContext.scala      |   52 +-
 .../spark/streaming/api/java/JavaDStream.scala  |    8 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   97 +-
 .../streaming/api/java/JavaPairDStream.scala    |  186 ++-
 .../api/java/JavaStreamingContext.scala         |  108 +-
 .../streaming/dstream/CoGroupedDStream.scala    |   59 -
 .../streaming/dstream/KafkaInputDStream.scala   |   62 +-
 .../streaming/dstream/MQTTInputDStream.scala    |  110 ++
 .../streaming/dstream/NetworkInputDStream.scala |   18 +-
 .../streaming/dstream/RawInputDStream.scala     |    4 +-
 .../streaming/dstream/TransformedDStream.scala  |   20 +-
 .../streaming/receivers/ActorReceiver.scala     |    4 +-
 .../apache/spark/streaming/JavaAPISuite.java    |  425 ++++++-
 .../apache/spark/streaming/JavaTestUtils.scala  |   36 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  141 ++-
 .../spark/streaming/CheckpointSuite.scala       |    4 +-
 .../spark/streaming/InputStreamsSuite.scala     |   91 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   61 +-
 .../tools/JavaAPICompletenessChecker.scala      |    4 +-
 yarn/pom.xml                                    |   50 +
 .../spark/deploy/yarn/ApplicationMaster.scala   |   55 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  258 ++++-
 .../spark/deploy/yarn/ClientArguments.scala     |   25 +-
 .../yarn/ClientDistributedCacheManager.scala    |  228 ++++
 .../spark/deploy/yarn/WorkerRunnable.scala      |   83 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |    4 +-
 .../ClientDistributedCacheManagerSuite.scala    |  220 ++++
 272 files changed, 11974 insertions(+), 5805 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6590e97,035942a..a686b53
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@@ -60,9 -60,9 +60,9 @@@ private[spark] class MapOutputTracker e
    private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
  
    // Set to the MapOutputTrackerActor living on the driver
 -  var trackerActor: ActorRef = _
 +  var trackerActor: Either[ActorRef, ActorSelection] = _
  
-   private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
+   protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
  
    // Incremented every time a fetch fails so that client nodes know to clear
    // their cache of map output locations if this happens.
@@@ -77,13 -74,9 +74,13 @@@
  
    // Send a message to the trackerActor and get its result within a default timeout, or
    // throw a SparkException if this fails.
-   def askTracker(message: Any): Any = {
+   private def askTracker(message: Any): Any = {
      try {
 -      val future = trackerActor.ask(message)(timeout)
 +      val future = if (trackerActor.isLeft ) {
 +        trackerActor.left.get.ask(message)(timeout)
 +      } else {
 +        trackerActor.right.get.ask(message)(timeout)
 +      }
        return Await.result(future, timeout)
      } catch {
        case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 000d1ee,572fc34..070f10f
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@@ -45,30 -47,72 +47,71 @@@ private[spark] class Client
      listener: ClientListener)
    extends Logging {
  
+   val REGISTRATION_TIMEOUT = 20.seconds
+   val REGISTRATION_RETRIES = 3
+ 
++  var prevMaster: ActorRef = null // set for unwatching, when it fails.
    var actor: ActorRef = null
    var appId: String = null
+   var registered = false
+   var activeMasterUrl: String = null
  
    class ClientActor extends Actor with Logging {
 -    var master: ActorRef = null
 -    var masterAddress: Address = null
 +    var master: ActorSelection = null
      var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
+     var alreadyDead = false  // To avoid calling listener.dead() multiple times
  
      override def preStart() {
-       logInfo("Connecting to master " + masterUrl)
        try {
-         master = context.actorSelection(Master.toAkkaUrl(masterUrl))
-         master ! RegisterApplication(appDescription)
+         registerWithMaster()
        } catch {
          case e: Exception =>
-           logError("Failed to connect to master", e)
+           logWarning("Failed to connect to master", e)
            markDisconnected()
            context.stop(self)
        }
      }
  
+     def tryRegisterAllMasters() {
+       for (masterUrl <- masterUrls) {
+         logInfo("Connecting to master " + masterUrl + "...")
 -        val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
++        val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+         actor ! RegisterApplication(appDescription)
+       }
+     }
+ 
+     def registerWithMaster() {
+       tryRegisterAllMasters()
+ 
+       import context.dispatcher
+       var retries = 0
+       lazy val retryTimer: Cancellable =
+         context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+           retries += 1
+           if (registered) {
+             retryTimer.cancel()
+           } else if (retries >= REGISTRATION_RETRIES) {
+             logError("All masters are unresponsive! Giving up.")
+             markDead()
+           } else {
+             tryRegisterAllMasters()
+           }
+         }
+       retryTimer // start timer
+     }
+ 
+     def changeMaster(url: String) {
+       activeMasterUrl = url
 -      master = context.actorFor(Master.toAkkaUrl(url))
 -      masterAddress = master.path.address
 -      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 -      context.watch(master)  // Doesn't work with remote actors, but useful for testing
++      master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+     }
+ 
      override def receive = {
-       case RegisteredApplication(appId_) =>
+       case RegisteredApplication(appId_, masterUrl) =>
 +        context.watch(sender)
++        prevMaster = sender
          appId = appId_
+         registered = true
+         changeMaster(masterUrl)
          listener.connected(appId)
  
        case ApplicationRemoved(message) =>
@@@ -89,13 -133,27 +132,19 @@@
            listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
          }
  
+       case MasterChanged(masterUrl, masterWebUiUrl) =>
+         logInfo("Master has changed, new master is at " + masterUrl)
 -        context.unwatch(master)
++        context.unwatch(prevMaster)
+         changeMaster(masterUrl)
+         alreadyDisconnected = false
+         sender ! MasterChangeAcknowledged(appId)
+ 
 -      case Terminated(actor_) if actor_ == master =>
 -        logWarning("Connection to master failed; waiting for master to reconnect...")
 -        markDisconnected()
 -
 -      case DisassociatedEvent(_, address, _) if address == masterAddress =>
 -        logWarning("Connection to master failed; waiting for master to reconnect...")
 -        markDisconnected()
 -
 -      case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
 -        logWarning("Connection to master failed; waiting for master to reconnect...")
 +      case Terminated(actor_) =>
-         logError(s"Connection to $actor_ dropped, stopping client")
++        logWarning(s"Connection to $actor_ failed; waiting for master to reconnect...")
          markDisconnected()
-         context.stop(self)
  
        case StopClient =>
-         markDisconnected()
+         markDead()
          sender ! true
          context.stop(self)
      }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cb0fe6a,7db5097..a7cfc25
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@@ -94,11 -100,11 +100,10 @@@ private[spark] class Master(host: Strin
    val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
  
    override def preStart() {
-     logInfo("Starting Spark master at spark://" + host + ":" + port)
+     logInfo("Starting Spark master at " + masterUrl)
      // Listen for remote client disconnection events, since they don't go through Akka's watch()
--    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      webUi.start()
-     import context.dispatcher
+     masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
      context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
  
      masterMetricsSystem.registerSource(masterSource)
@@@ -113,27 -144,56 +143,56 @@@
    }
  
    override def receive = {
-     case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+     case ElectedLeader => {
+       val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+       state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+         RecoveryState.ALIVE
+       else
+         RecoveryState.RECOVERING
+ 
+       logInfo("I have been elected leader! New state: " + state)
+ 
+       if (state == RecoveryState.RECOVERING) {
+         beginRecovery(storedApps, storedWorkers)
+         context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
+       }
+     }
+ 
+     case RevokedLeadership => {
+       logError("Leadership has been revoked -- master shutting down.")
+       System.exit(0)
+     }
+ 
+     case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
        logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
          host, workerPort, cores, Utils.megabytesToString(memory)))
-       if (idToWorker.contains(id)) {
+       if (state == RecoveryState.STANDBY) {
+         // ignore, don't send response
+       } else if (idToWorker.contains(id)) {
          sender ! RegisterWorkerFailed("Duplicate worker ID")
        } else {
-         addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
-         context.watch(sender)  // This doesn't work with remote actors but helps for testing
-         sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+         val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+         registerWorker(worker)
 -        context.watch(sender)  // This doesn't work with remote actors but helps for testing
++        context.watch(sender)
+         persistenceEngine.addWorker(worker)
+         sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        }
      }
  
      case RegisterApplication(description) => {
-       logInfo("Registering app " + description.name)
-       val app = addApplication(description, sender)
-       logInfo("Registered app " + description.name + " with ID " + app.id)
-       waitingApps += app
-       context.watch(sender)  // This doesn't work with remote actors but helps for testing
-       sender ! RegisteredApplication(app.id)
-       schedule()
+       if (state == RecoveryState.STANDBY) {
+         // ignore, don't send response
+       } else {
+         logInfo("Registering app " + description.name)
+         val app = createApplication(description, sender)
+         registerApplication(app)
+         logInfo("Registered app " + description.name + " with ID " + app.id)
 -        context.watch(sender)  // This doesn't work with remote actors but helps for testing
++        context.watch(sender)
+         persistenceEngine.addApplication(app)
+         sender ! RegisteredApplication(app.id, masterUrl)
+         schedule()
+       }
      }
  
      case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
@@@ -178,22 -270,26 +269,12 @@@
        // those we have an entry for in the corresponding actor hashmap
        actorToWorker.get(actor).foreach(removeWorker)
        actorToApp.get(actor).foreach(finishApplication)
-     }
- 
-     case DisassociatedEvent(_, address, _) => {
-       // The disconnected client could've been either a worker or an app; remove whichever it was
-       addressToWorker.get(address).foreach(removeWorker)
-       addressToApp.get(address).foreach(finishApplication)
-     }
- 
-     case AssociationErrorEvent(_, _, address, _) => {
-       // The disconnected client could've been either a worker or an app; remove whichever it was
-       addressToWorker.get(address).foreach(removeWorker)
-       addressToApp.get(address).foreach(finishApplication)
+       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
      }
  
 -    case DisassociatedEvent(_, address, _) => {
 -      // The disconnected client could've been either a worker or an app; remove whichever it was
 -      addressToWorker.get(address).foreach(removeWorker)
 -      addressToApp.get(address).foreach(finishApplication)
 -      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
 -    }
 -
 -    case AssociationErrorEvent(_, _, address, _) => {
 -      // The disconnected client could've been either a worker or an app; remove whichever it was
 -      addressToWorker.get(address).foreach(removeWorker)
 -      addressToApp.get(address).foreach(finishApplication)
 -      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
 -    }
 -
      case RequestMasterState => {
-       sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+       sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+         state)
      }
  
      case CheckForWorkerTimeOut => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 400d6f2,07189ac..9472c9a
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@@ -25,27 -25,19 +25,27 @@@ import scala.collection.mutable.HashMa
  import scala.concurrent.duration._
  
  import akka.actor._
- import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
- 
 -import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 -
  import org.apache.spark.Logging
- import org.apache.spark.deploy.ExecutorState
+ import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
  import org.apache.spark.deploy.DeployMessages._
  import org.apache.spark.deploy.master.Master
  import org.apache.spark.deploy.worker.ui.WorkerWebUI
  import org.apache.spark.metrics.MetricsSystem
  import org.apache.spark.util.{Utils, AkkaUtils}
 +import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
 +import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 +import org.apache.spark.deploy.DeployMessages.KillExecutor
 +import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 +import scala.Some
 +import org.apache.spark.deploy.DeployMessages.Heartbeat
 +import org.apache.spark.deploy.DeployMessages.RegisteredWorker
 +import akka.remote.DisassociatedEvent
 +import org.apache.spark.deploy.DeployMessages.LaunchExecutor
 +import org.apache.spark.deploy.DeployMessages.RegisterWorker
  
- 
+ /**
+   * @param masterUrls Each url should look like spark://host:port.
+   */
  private[spark] class Worker(
      host: String,
      port: Int,
@@@ -64,8 -57,18 +65,19 @@@
    // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
    val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
  
+   val REGISTRATION_TIMEOUT = 20.seconds
+   val REGISTRATION_RETRIES = 3
+ 
+   // Index into masterUrls that we're currently trying to register with.
+   var masterIndex = 0
+ 
+   val masterLock: Object = new Object()
 -  var master: ActorRef = null
 +  var master: ActorSelection = null
-   var masterWebUiUrl : String = ""
++  var prevMaster: ActorRef = null
+   var activeMasterUrl: String = ""
+   var activeMasterWebUiUrl : String = ""
+   @volatile var registered = false
+   @volatile var connected = false
    val workerId = generateWorkerId()
    var sparkHome: File = null
    var workDir: File = null
@@@ -119,40 -123,92 +132,93 @@@
      metricsSystem.start()
    }
  
-   def connectToMaster() {
-     logInfo("Connecting to master " + masterUrl)
-     master = context.actorSelection(Master.toAkkaUrl(masterUrl))
-     master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
+   def changeMaster(url: String, uiUrl: String) {
+     masterLock.synchronized {
+       activeMasterUrl = url
+       activeMasterWebUiUrl = uiUrl
 -      master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
 -      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 -      context.watch(master) // Doesn't work with remote actors, but useful for testing
++      master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+       connected = true
+     }
    }
  
-   import context.dispatcher
+   def tryRegisterAllMasters() {
+     for (masterUrl <- masterUrls) {
+       logInfo("Connecting to master " + masterUrl + "...")
 -      val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
++      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+       actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
+         publicAddress)
+     }
+   }
+ 
+   def registerWithMaster() {
+     tryRegisterAllMasters()
+ 
+     var retries = 0
+     lazy val retryTimer: Cancellable =
+       context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+         retries += 1
+         if (registered) {
+           retryTimer.cancel()
+         } else if (retries >= REGISTRATION_RETRIES) {
+           logError("All masters are unresponsive! Giving up.")
+           System.exit(1)
+         } else {
+           tryRegisterAllMasters()
+         }
+       }
+     retryTimer // start timer
+   }
  
    override def receive = {
-     case RegisteredWorker(url) =>
-       masterWebUiUrl = url
-       logInfo("Successfully registered with master")
+     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+       logInfo("Successfully registered with master " + masterUrl)
+       registered = true
 +      context.watch(sender) // remote death watch for master
-       //TODO: Is heartbeat really necessary akka does it anyway !
-         context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
-         master ! Heartbeat(workerId)
++      prevMaster = sender
+       changeMaster(masterUrl, masterWebUiUrl)
+       context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+ 
+     case SendHeartbeat =>
+       masterLock.synchronized {
+         if (connected) { master ! Heartbeat(workerId) }
        }
  
+     case MasterChanged(masterUrl, masterWebUiUrl) =>
+       logInfo("Master has changed, new master is at " + masterUrl)
 -      context.unwatch(master)
++      context.unwatch(prevMaster)
++      prevMaster = sender
+       changeMaster(masterUrl, masterWebUiUrl)
+ 
+       val execs = executors.values.
+         map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+       sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+ 
      case RegisterWorkerFailed(message) =>
-       logError("Worker registration failed: " + message)
-       System.exit(1)
- 
-     case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
-       logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-       val manager = new ExecutorRunner(
-         appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
-       executors(appId + "/" + execId) = manager
-       manager.start()
-       coresUsed += cores_
-       memoryUsed += memory_
-       master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+       if (!registered) {
+         logError("Worker registration failed: " + message)
+         System.exit(1)
+       }
+ 
+     case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+       if (masterUrl != activeMasterUrl) {
+         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+       } else {
+         logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+         val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+           self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+         executors(appId + "/" + execId) = manager
+         manager.start()
+         coresUsed += cores_
+         memoryUsed += memory_
+         masterLock.synchronized {
+           master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+         }
+       }
  
      case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
-       master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+       masterLock.synchronized {
+         master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+       }
        val fullId = appId + "/" + execId
        if (ExecutorState.isFinished(state)) {
          val executor = executors(fullId)
@@@ -165,18 -221,21 +231,22 @@@
          memoryUsed -= executor.memory
        }
  
-     case KillExecutor(appId, execId) =>
-       val fullId = appId + "/" + execId
-       executors.get(fullId) match {
-         case Some(executor) =>
-           logInfo("Asked to kill executor " + fullId)
-           executor.kill()
-         case None =>
-           logInfo("Asked to kill unknown executor " + fullId)
+     case KillExecutor(masterUrl, appId, execId) =>
+       if (masterUrl != activeMasterUrl) {
+         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
+       } else {
+         val fullId = appId + "/" + execId
+         executors.get(fullId) match {
+           case Some(executor) =>
+             logInfo("Asked to kill executor " + fullId)
+             executor.kill()
+           case None =>
+             logInfo("Asked to kill unknown executor " + fullId)
+         }
        }
  
 -    case DisassociatedEvent(_, address, _) if address == master.path.address =>
 +    case Terminated(actor_) =>
 +      logInfo(s"$actor_ terminated !")
        masterDisconnected()
  
      case RequestWorkerState => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 0000000,50302fc..16d8f81
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -1,0 -1,126 +1,126 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.executor
+ 
+ import java.nio.ByteBuffer
+ 
+ import akka.actor._
+ import akka.remote._
+ 
+ import org.apache.spark.{Logging, SparkEnv}
+ import org.apache.spark.TaskState.TaskState
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.{Utils, AkkaUtils}
+ import akka.remote.DisassociatedEvent
+ import akka.remote.AssociationErrorEvent
+ import akka.remote.DisassociatedEvent
+ import akka.actor.Terminated
+ 
+ 
+ private[spark] class CoarseGrainedExecutorBackend(
+     driverUrl: String,
+     executorId: String,
+     hostPort: String,
+     cores: Int)
+   extends Actor
+   with ExecutorBackend
+   with Logging {
+ 
+   Utils.checkHostPort(hostPort, "Expected hostport")
+ 
+   var executor: Executor = null
+   var driver: ActorSelection = null
+ 
+   override def preStart() {
+     logInfo("Connecting to driver: " + driverUrl)
+     driver = context.actorSelection(driverUrl)
+     driver ! RegisterExecutor(executorId, hostPort, cores)
+     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
 -   // context.watch(driver) // Doesn't work with remote actors, but useful for testing
+   }
+ 
+   override def receive = {
+     case RegisteredExecutor(sparkProperties) =>
+       logInfo("Successfully registered with driver")
++      context.watch(sender) //Start watching for terminated messages.
+       // Make this host instead of hostPort ?
+       executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
+ 
+     case RegisterExecutorFailed(message) =>
+       logError("Slave registration failed: " + message)
+       System.exit(1)
+ 
+     case LaunchTask(taskDesc) =>
+       logInfo("Got assigned task " + taskDesc.taskId)
+       if (executor == null) {
+         logError("Received LaunchTask command but executor was null")
+         System.exit(1)
+       } else {
+         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+       }
+ 
+     case KillTask(taskId, _) =>
+       if (executor == null) {
+         logError("Received KillTask command but executor was null")
+         System.exit(1)
+       } else {
+         executor.killTask(taskId)
+       }
+ 
 -    case DisassociatedEvent(_, _, _) =>
 -      logError("Driver terminated or disconnected! Shutting down.")
++    case Terminated(actor) =>
++      logError(s"Driver $actor terminated or disconnected! Shutting down.")
+       System.exit(1)
+ 
+     case StopExecutor =>
+       logInfo("Driver commanded a shutdown")
+       context.stop(self)
+       context.system.shutdown()
+   }
+ 
+   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+     driver ! StatusUpdate(executorId, taskId, state, data)
+   }
+ }
+ 
+ private[spark] object CoarseGrainedExecutorBackend {
+   def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+     // Debug code
+     Utils.checkHost(hostname)
+ 
+     // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
+     // before getting started with all our system properties, etc
+     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+     // set it
+     val sparkHostPort = hostname + ":" + boundPort
+     System.setProperty("spark.hostPort", sparkHostPort)
+     actorSystem.actorOf(
+       Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
+       name = "Executor")
+     actorSystem.awaitTermination()
+   }
+ 
+   def main(args: Array[String]) {
+     if (args.length < 4) {
+       //the reason we allow the last appid argument is to make it easy to kill rogue executors
+       System.err.println(
+         "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
+         "[<appid>]")
+       System.exit(1)
+     }
+     run(args(0), args(1), args(2), args(3).toInt)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0000000,3ccc38d..03cf1e2
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@@ -1,0 -1,238 +1,233 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.scheduler.cluster
+ 
+ import java.util.concurrent.atomic.AtomicInteger
+ 
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+ import scala.concurrent.Await
+ import scala.concurrent.duration._
+ 
+ import akka.actor._
+ import akka.pattern.ask
+ import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+ 
+ import org.apache.spark.{SparkException, Logging, TaskState}
+ import org.apache.spark.scheduler.TaskDescription
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.Utils
+ 
+ /**
+  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
+  * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
+  * executors whenever a task is done and asking the scheduler to launch a new executor for
+  * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
+  * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
+  * (spark.deploy.*).
+  */
+ private[spark]
+ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+   extends SchedulerBackend with Logging
+ {
+   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+   var totalCoreCount = new AtomicInteger(0)
+ 
+   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+     private val executorActor = new HashMap[String, ActorRef]
+     private val executorAddress = new HashMap[String, Address]
+     private val executorHost = new HashMap[String, String]
+     private val freeCores = new HashMap[String, Int]
+     private val actorToExecutorId = new HashMap[ActorRef, String]
+     private val addressToExecutorId = new HashMap[Address, String]
+ 
+     override def preStart() {
+       // Listen for remote client disconnection events, since they don't go through Akka's watch()
+       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ 
+       // Periodically revive offers to allow delay scheduling to work
+       val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+       import context.dispatcher
+       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
+     }
+ 
+     def receive = {
+       case RegisterExecutor(executorId, hostPort, cores) =>
+         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
+         if (executorActor.contains(executorId)) {
+           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
+         } else {
+           logInfo("Registered executor: " + sender + " with ID " + executorId)
+           sender ! RegisteredExecutor(sparkProperties)
+           context.watch(sender)
+           executorActor(executorId) = sender
+           executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+           freeCores(executorId) = cores
+           executorAddress(executorId) = sender.path.address
+           actorToExecutorId(sender) = executorId
+           addressToExecutorId(sender.path.address) = executorId
+           totalCoreCount.addAndGet(cores)
+           makeOffers()
+         }
+ 
+       case StatusUpdate(executorId, taskId, state, data) =>
+         scheduler.statusUpdate(taskId, state, data.value)
+         if (TaskState.isFinished(state)) {
+           if (executorActor.contains(executorId)) {
+             freeCores(executorId) += 1
+             makeOffers(executorId)
+           } else {
+             // Ignoring the update since we don't know about the executor.
+             val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
+             logWarning(msg.format(taskId, state, sender, executorId))
+           }
+         }
+ 
+       case ReviveOffers =>
+         makeOffers()
+ 
+       case KillTask(taskId, executorId) =>
+         executorActor(executorId) ! KillTask(taskId, executorId)
+ 
+       case StopDriver =>
+         sender ! true
+         context.stop(self)
+ 
+       case StopExecutors =>
+         logInfo("Asking each executor to shut down")
+         for (executor <- executorActor.values) {
+           executor ! StopExecutor
+         }
+         sender ! true
+ 
+       case RemoveExecutor(executorId, reason) =>
+         removeExecutor(executorId, reason)
+         sender ! true
+ 
+       case Terminated(actor) =>
+         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+ 
 -      case DisassociatedEvent(_, remoteAddress, _) =>
 -        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
 -
 -      case AssociationErrorEvent(_, _, remoteAddress, _) =>
 -        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
+     }
+ 
+     // Make fake resource offers on all executors
+     def makeOffers() {
+       launchTasks(scheduler.resourceOffers(
+         executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+     }
+ 
+     // Make fake resource offers on just one executor
+     def makeOffers(executorId: String) {
+       launchTasks(scheduler.resourceOffers(
+         Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+     }
+ 
+     // Launch tasks returned by a set of resource offers
+     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+       for (task <- tasks.flatten) {
+         freeCores(task.executorId) -= 1
+         executorActor(task.executorId) ! LaunchTask(task)
+       }
+     }
+ 
+     // Remove a disconnected slave from the cluster
+     def removeExecutor(executorId: String, reason: String) {
+       if (executorActor.contains(executorId)) {
+         logInfo("Executor " + executorId + " disconnected, so removing it")
+         val numCores = freeCores(executorId)
+         actorToExecutorId -= executorActor(executorId)
+         addressToExecutorId -= executorAddress(executorId)
+         executorActor -= executorId
+         executorHost -= executorId
+         freeCores -= executorId
+         totalCoreCount.addAndGet(-numCores)
+         scheduler.executorLost(executorId, SlaveLost(reason))
+       }
+     }
+   }
+ 
+   var driverActor: ActorRef = null
+   val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+ 
+   override def start() {
+     val properties = new ArrayBuffer[(String, String)]
+     val iterator = System.getProperties.entrySet.iterator
+     while (iterator.hasNext) {
+       val entry = iterator.next
+       val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+       if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
+         properties += ((key, value))
+       }
+     }
+     driverActor = actorSystem.actorOf(
+       Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
+   }
+ 
+   private val timeout = {
+     Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+   }
+ 
+   def stopExecutors() {
+     try {
+       if (driverActor != null) {
+         logInfo("Shutting down all executors")
+         val future = driverActor.ask(StopExecutors)(timeout)
+         Await.ready(future, timeout)
+       }
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error asking standalone scheduler to shut down executors", e)
+     }
+   }
+ 
+   override def stop() {
+     try {
+       if (driverActor != null) {
+         val future = driverActor.ask(StopDriver)(timeout)
+         Await.ready(future, timeout)
+       }
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error stopping standalone scheduler's driver actor", e)
+     }
+   }
+ 
+   override def reviveOffers() {
+     driverActor ! ReviveOffers
+   }
+ 
+   override def killTask(taskId: Long, executorId: String) {
+     driverActor ! KillTask(taskId, executorId)
+   }
+ 
+   override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+       .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+ 
+   // Called by subclasses when notified of a lost worker
+   def removeExecutor(executorId: String, reason: String) {
+     try {
+       val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+       Await.ready(future, timeout)
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+     }
+   }
+ }
+ 
+ private[spark] object CoarseGrainedSchedulerBackend {
+   val ACTOR_NAME = "CoarseGrainedScheduler"
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 955f6cd,fd17460..271dc90
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@@ -48,15 -48,15 +48,15 @@@ class MapOutputTrackerSuite extends Fun
  
    test("master start and stop") {
      val actorSystem = ActorSystem("test")
-     val tracker = new MapOutputTracker()
-     tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+     val tracker = new MapOutputTrackerMaster()
 -    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
      tracker.stop()
    }
  
    test("master register and fetch") {
      val actorSystem = ActorSystem("test")
-     val tracker = new MapOutputTracker()
-     tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+     val tracker = new MapOutputTrackerMaster()
 -    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
      tracker.registerShuffle(10, 2)
      val compressedSize1000 = MapOutputTracker.compressSize(1000L)
      val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@@ -74,8 -74,8 +74,8 @@@
  
    test("master register and unregister and fetch") {
      val actorSystem = ActorSystem("test")
-     val tracker = new MapOutputTracker()
-     tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+     val tracker = new MapOutputTrackerMaster()
 -    tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++    tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
      tracker.registerShuffle(10, 2)
      val compressedSize1000 = MapOutputTracker.compressSize(1000L)
      val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@@ -102,9 -100,9 +100,9 @@@
      System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
      System.setProperty("spark.hostPort", hostname + ":" + boundPort)
  
-     val masterTracker = new MapOutputTracker()
+     val masterTracker = new MapOutputTrackerMaster()
 -    masterTracker.trackerActor = actorSystem.actorOf(
 -        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
 +    masterTracker.trackerActor = Left(actorSystem.actorOf(
-         Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker"))
++        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
  
      val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
      val slaveTracker = new MapOutputTracker()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------