You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:57 UTC
[13/50] [abbrv] git commit: Merge branch 'scala210-master' of
github.com:colorant/incubator-spark into scala-2.10
Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/deploy/client/Client.scala
core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/199e9cf0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/199e9cf0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/199e9cf0
Branch: refs/heads/master
Commit: 199e9cf02dfaa372c1f067bca54556e1f6ce787d
Parents: 6860b79 f6b2e59
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Thu Nov 21 11:55:48 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Thu Nov 21 11:55:48 2013 +0530
----------------------------------------------------------------------
README.md | 2 +-
bin/compute-classpath.sh | 22 +-
bin/slaves.sh | 19 +-
bin/spark-daemon.sh | 21 +-
bin/spark-daemons.sh | 2 +-
bin/stop-slaves.sh | 2 -
core/pom.xml | 4 +
.../spark/network/netty/FileClientHandler.java | 3 +-
.../spark/network/netty/FileServerHandler.java | 23 +-
.../spark/network/netty/PathResolver.java | 11 +-
.../hadoop/mapred/SparkHadoopMapRedUtil.scala | 17 +-
.../mapreduce/SparkHadoopMapReduceUtil.scala | 33 +-
.../scala/org/apache/spark/Aggregator.scala | 49 +-
.../apache/spark/BlockStoreShuffleFetcher.scala | 23 +-
.../scala/org/apache/spark/CacheManager.scala | 12 +-
.../scala/org/apache/spark/FutureAction.scala | 250 +++++
.../apache/spark/InterruptibleIterator.scala | 30 +
.../org/apache/spark/MapOutputTracker.scala | 169 +--
.../scala/org/apache/spark/ShuffleFetcher.scala | 5 +-
.../scala/org/apache/spark/SparkContext.scala | 234 ++--
.../main/scala/org/apache/spark/SparkEnv.scala | 25 +-
.../org/apache/spark/SparkHadoopWriter.scala | 21 +-
.../scala/org/apache/spark/TaskContext.scala | 21 +-
.../scala/org/apache/spark/TaskEndReason.scala | 2 +
.../apache/spark/api/java/JavaDoubleRDD.scala | 24 +
.../org/apache/spark/api/java/JavaPairRDD.scala | 35 +
.../org/apache/spark/api/java/JavaRDD.scala | 19 +
.../java/function/DoubleFlatMapFunction.java | 10 +-
.../spark/api/java/function/DoubleFunction.java | 3 +-
.../api/java/function/FlatMapFunction.scala | 3 -
.../api/java/function/FlatMapFunction2.scala | 3 -
.../spark/api/java/function/Function.java | 4 +-
.../spark/api/java/function/Function2.java | 2 -
.../spark/api/java/function/Function3.java | 36 +
.../api/java/function/PairFlatMapFunction.java | 2 -
.../spark/api/java/function/PairFunction.java | 5 +-
.../api/java/function/WrappedFunction3.scala | 34 +
.../org/apache/spark/api/python/PythonRDD.scala | 2 +-
.../spark/broadcast/BitTorrentBroadcast.scala | 1058 ------------------
.../apache/spark/broadcast/HttpBroadcast.scala | 13 +-
.../apache/spark/broadcast/MultiTracker.scala | 410 -------
.../org/apache/spark/broadcast/SourceInfo.scala | 54 -
.../spark/broadcast/TorrentBroadcast.scala | 247 ++++
.../apache/spark/broadcast/TreeBroadcast.scala | 603 ----------
.../org/apache/spark/deploy/DeployMessage.scala | 29 +-
.../spark/deploy/ExecutorDescription.scala | 34 +
.../spark/deploy/FaultToleranceTest.scala | 420 +++++++
.../org/apache/spark/deploy/JsonProtocol.scala | 3 +-
.../apache/spark/deploy/LocalSparkCluster.scala | 7 +-
.../apache/spark/deploy/SparkHadoopUtil.scala | 60 +-
.../org/apache/spark/deploy/client/Client.scala | 74 +-
.../spark/deploy/client/ClientListener.scala | 4 +
.../apache/spark/deploy/client/TestClient.scala | 7 +-
.../spark/deploy/master/ApplicationInfo.scala | 53 +-
.../spark/deploy/master/ApplicationState.scala | 5 +-
.../spark/deploy/master/ExecutorInfo.scala | 7 +-
.../master/FileSystemPersistenceEngine.scala | 90 ++
.../deploy/master/LeaderElectionAgent.scala | 45 +
.../org/apache/spark/deploy/master/Master.scala | 256 ++++-
.../spark/deploy/master/MasterMessages.scala | 46 +
.../spark/deploy/master/PersistenceEngine.scala | 53 +
.../spark/deploy/master/RecoveryState.scala | 26 +
.../deploy/master/SparkZooKeeperSession.scala | 203 ++++
.../apache/spark/deploy/master/WorkerInfo.scala | 42 +-
.../spark/deploy/master/WorkerState.scala | 2 +-
.../master/ZooKeeperLeaderElectionAgent.scala | 136 +++
.../master/ZooKeeperPersistenceEngine.scala | 85 ++
.../spark/deploy/worker/ExecutorRunner.scala | 15 +-
.../org/apache/spark/deploy/worker/Worker.scala | 166 ++-
.../spark/deploy/worker/WorkerArguments.scala | 8 +-
.../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +-
.../executor/CoarseGrainedExecutorBackend.scala | 126 +++
.../org/apache/spark/executor/Executor.scala | 173 ++-
.../spark/executor/MesosExecutorBackend.scala | 18 +-
.../executor/StandaloneExecutorBackend.scala | 119 --
.../org/apache/spark/executor/TaskMetrics.scala | 5 +
.../spark/network/ConnectionManager.scala | 3 +-
.../apache/spark/network/netty/FileHeader.scala | 22 +-
.../spark/network/netty/ShuffleCopier.scala | 27 +-
.../spark/network/netty/ShuffleSender.scala | 9 +-
.../main/scala/org/apache/spark/package.scala | 2 +
.../org/apache/spark/rdd/AsyncRDDActions.scala | 123 ++
.../scala/org/apache/spark/rdd/BlockRDD.scala | 9 +-
.../org/apache/spark/rdd/CheckpointRDD.scala | 9 +-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 26 +-
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 125 +--
.../spark/rdd/MapPartitionsWithContextRDD.scala | 42 +
.../spark/rdd/MapPartitionsWithIndexRDD.scala | 42 -
.../org/apache/spark/rdd/NewHadoopRDD.scala | 79 +-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 16 +-
.../spark/rdd/ParallelCollectionRDD.scala | 5 +-
.../main/scala/org/apache/spark/rdd/RDD.scala | 106 +-
.../org/apache/spark/rdd/ShuffledRDD.scala | 2 +-
.../org/apache/spark/rdd/SubtractedRDD.scala | 2 +-
.../apache/spark/scheduler/DAGScheduler.scala | 292 ++---
.../spark/scheduler/DAGSchedulerEvent.scala | 29 +-
.../spark/scheduler/DAGSchedulerSource.scala | 2 +-
.../spark/scheduler/InputFormatInfo.scala | 7 +-
.../org/apache/spark/scheduler/JobLogger.scala | 676 ++++++-----
.../org/apache/spark/scheduler/JobWaiter.scala | 62 +-
.../scala/org/apache/spark/scheduler/Pool.scala | 5 +-
.../org/apache/spark/scheduler/ResultTask.scala | 48 +-
.../spark/scheduler/SchedulableBuilder.scala | 3 +
.../apache/spark/scheduler/ShuffleMapTask.scala | 76 +-
.../apache/spark/scheduler/SparkListener.scala | 21 +-
.../spark/scheduler/SparkListenerBus.scala | 2 +
.../org/apache/spark/scheduler/Stage.scala | 6 +-
.../org/apache/spark/scheduler/StageInfo.scala | 13 +-
.../scala/org/apache/spark/scheduler/Task.scala | 63 +-
.../org/apache/spark/scheduler/TaskInfo.scala | 20 +
.../org/apache/spark/scheduler/TaskResult.scala | 3 +-
.../apache/spark/scheduler/TaskScheduler.scala | 10 +-
.../spark/scheduler/TaskSchedulerListener.scala | 44 -
.../org/apache/spark/scheduler/TaskSet.scala | 4 +
.../scheduler/cluster/ClusterScheduler.scala | 79 +-
.../cluster/ClusterTaskSetManager.scala | 106 +-
.../cluster/CoarseGrainedClusterMessage.scala | 69 ++
.../cluster/CoarseGrainedSchedulerBackend.scala | 233 ++++
.../scheduler/cluster/SchedulerBackend.scala | 6 +-
.../cluster/SimrSchedulerBackend.scala | 78 ++
.../cluster/SparkDeploySchedulerBackend.scala | 20 +-
.../cluster/StandaloneClusterMessage.scala | 63 --
.../cluster/StandaloneSchedulerBackend.scala | 195 ----
.../scheduler/cluster/TaskResultGetter.scala | 26 +-
.../mesos/CoarseMesosSchedulerBackend.scala | 16 +-
.../spark/scheduler/local/LocalScheduler.scala | 196 ++--
.../scheduler/local/LocalTaskSetManager.scala | 24 +-
.../spark/serializer/KryoSerializer.scala | 52 +-
.../apache/spark/storage/BlockException.scala | 2 +-
.../spark/storage/BlockFetcherIterator.scala | 24 +-
.../org/apache/spark/storage/BlockId.scala | 103 ++
.../org/apache/spark/storage/BlockInfo.scala | 81 ++
.../org/apache/spark/storage/BlockManager.scala | 632 ++++-------
.../spark/storage/BlockManagerMaster.scala | 8 +-
.../spark/storage/BlockManagerMasterActor.scala | 25 +-
.../spark/storage/BlockManagerMessages.scala | 16 +-
.../spark/storage/BlockManagerSlaveActor.scala | 1 +
.../spark/storage/BlockManagerWorker.scala | 4 +-
.../org/apache/spark/storage/BlockMessage.scala | 38 +-
.../spark/storage/BlockMessageArray.scala | 7 +-
.../spark/storage/BlockObjectWriter.scala | 142 ++-
.../org/apache/spark/storage/BlockStore.scala | 14 +-
.../apache/spark/storage/DiskBlockManager.scala | 151 +++
.../org/apache/spark/storage/DiskStore.scala | 280 +----
.../org/apache/spark/storage/FileSegment.scala | 28 +
.../org/apache/spark/storage/MemoryStore.scala | 34 +-
.../spark/storage/ShuffleBlockManager.scala | 200 +++-
.../spark/storage/StoragePerfTester.scala | 86 ++
.../org/apache/spark/storage/StorageUtils.scala | 47 +-
.../apache/spark/storage/ThreadingTest.scala | 6 +-
.../apache/spark/ui/UIWorkloadGenerator.scala | 2 +-
.../org/apache/spark/ui/jobs/IndexPage.scala | 2 +-
.../spark/ui/jobs/JobProgressListener.scala | 105 +-
.../org/apache/spark/ui/jobs/PoolTable.scala | 8 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 48 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 33 +-
.../org/apache/spark/ui/storage/RDDPage.scala | 23 +-
.../org/apache/spark/util/AppendOnlyMap.scala | 230 ++++
.../org/apache/spark/util/MetadataCleaner.scala | 37 +-
.../scala/org/apache/spark/util/Utils.scala | 48 +-
.../apache/spark/util/collection/BitSet.scala | 103 ++
.../spark/util/collection/OpenHashMap.scala | 153 +++
.../spark/util/collection/OpenHashSet.scala | 272 +++++
.../collection/PrimitiveKeyOpenHashMap.scala | 128 +++
.../spark/util/collection/PrimitiveVector.scala | 53 +
.../scala/org/apache/spark/BroadcastSuite.scala | 52 +-
.../org/apache/spark/CacheManagerSuite.scala | 21 +-
.../org/apache/spark/CheckpointSuite.scala | 10 +-
.../org/apache/spark/DistributedSuite.scala | 16 +-
.../org/apache/spark/FileServerSuite.scala | 16 +
.../scala/org/apache/spark/JavaAPISuite.java | 23 +-
.../org/apache/spark/JobCancellationSuite.scala | 209 ++++
.../apache/spark/MapOutputTrackerSuite.scala | 20 +-
.../apache/spark/deploy/JsonProtocolSuite.scala | 7 +-
.../deploy/worker/ExecutorRunnerTest.scala | 19 +
.../apache/spark/rdd/AsyncRDDActionsSuite.scala | 176 +++
.../spark/rdd/PairRDDFunctionsSuite.scala | 2 +-
.../scala/org/apache/spark/rdd/RDDSuite.scala | 20 +
.../spark/scheduler/DAGSchedulerSuite.scala | 37 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 17 +-
.../spark/scheduler/SparkListenerSuite.scala | 136 ++-
.../cluster/ClusterTaskSetManagerSuite.scala | 49 +-
.../spark/scheduler/cluster/FakeTask.scala | 5 +-
.../cluster/TaskResultGetterSuite.scala | 3 +-
.../scheduler/local/LocalSchedulerSuite.scala | 28 +-
.../org/apache/spark/storage/BlockIdSuite.scala | 114 ++
.../spark/storage/BlockManagerSuite.scala | 102 +-
.../spark/storage/DiskBlockManagerSuite.scala | 84 ++
.../apache/spark/util/AppendOnlyMapSuite.scala | 154 +++
.../spark/util/collection/BitSetSuite.scala | 73 ++
.../util/collection/OpenHashMapSuite.scala | 148 +++
.../util/collection/OpenHashSetSuite.scala | 145 +++
.../PrimitiveKeyOpenHashSetSuite.scala | 90 ++
docker/README.md | 5 +
docker/build | 22 +
docker/spark-test/README.md | 11 +
docker/spark-test/base/Dockerfile | 38 +
docker/spark-test/build | 22 +
docker/spark-test/master/Dockerfile | 21 +
docker/spark-test/master/default_cmd | 22 +
docker/spark-test/worker/Dockerfile | 22 +
docker/spark-test/worker/default_cmd | 22 +
docs/cluster-overview.md | 14 +-
docs/configuration.md | 10 +-
docs/ec2-scripts.md | 2 +-
docs/hadoop-third-party-distributions.md | 4 +-
docs/python-programming-guide.md | 11 +
docs/running-on-yarn.md | 10 +-
docs/scala-programming-guide.md | 6 +-
docs/spark-standalone.md | 75 ++
docs/streaming-programming-guide.md | 9 +-
docs/tuning.md | 2 +-
ec2/spark_ec2.py | 68 +-
examples/pom.xml | 36 +-
.../streaming/examples/JavaKafkaWordCount.java | 98 ++
.../apache/spark/examples/BroadcastTest.scala | 15 +-
.../org/apache/spark/examples/SparkHdfsLR.scala | 3 +-
.../org/apache/spark/examples/SparkKMeans.scala | 2 -
.../org/apache/spark/examples/SparkPi.scala | 2 +-
.../streaming/examples/KafkaWordCount.scala | 28 +-
.../streaming/examples/MQTTWordCount.scala | 107 ++
.../clickstream/PageViewGenerator.scala | 13 +-
pom.xml | 132 ++-
project/SparkBuild.scala | 35 +-
project/plugins.sbt | 2 +-
python/pyspark/accumulators.py | 13 +-
python/pyspark/context.py | 50 +-
.../org/apache/spark/repl/SparkILoop.scala | 22 +-
.../scala/org/apache/spark/repl/ReplSuite.scala | 34 +-
spark-class | 28 +-
spark-class2.cmd | 7 +
.../kafka/0.7.2-spark/kafka-0.7.2-spark.jar | Bin 1358063 -> 0 bytes
.../kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 | 1 -
.../0.7.2-spark/kafka-0.7.2-spark.jar.sha1 | 1 -
.../kafka/0.7.2-spark/kafka-0.7.2-spark.pom | 9 -
.../kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 | 1 -
.../0.7.2-spark/kafka-0.7.2-spark.pom.sha1 | 1 -
.../apache/kafka/kafka/maven-metadata-local.xml | 12 -
.../kafka/kafka/maven-metadata-local.xml.md5 | 1 -
.../kafka/kafka/maven-metadata-local.xml.sha1 | 1 -
streaming/pom.xml | 57 +-
.../org/apache/spark/streaming/Checkpoint.scala | 2 +
.../org/apache/spark/streaming/DStream.scala | 55 +-
.../spark/streaming/NetworkInputTracker.scala | 14 +-
.../spark/streaming/PairDStreamFunctions.scala | 154 ++-
.../spark/streaming/StreamingContext.scala | 52 +-
.../spark/streaming/api/java/JavaDStream.scala | 8 +-
.../streaming/api/java/JavaDStreamLike.scala | 97 +-
.../streaming/api/java/JavaPairDStream.scala | 186 ++-
.../api/java/JavaStreamingContext.scala | 108 +-
.../streaming/dstream/CoGroupedDStream.scala | 59 -
.../streaming/dstream/KafkaInputDStream.scala | 62 +-
.../streaming/dstream/MQTTInputDStream.scala | 110 ++
.../streaming/dstream/NetworkInputDStream.scala | 18 +-
.../streaming/dstream/RawInputDStream.scala | 4 +-
.../streaming/dstream/TransformedDStream.scala | 20 +-
.../streaming/receivers/ActorReceiver.scala | 4 +-
.../apache/spark/streaming/JavaAPISuite.java | 425 ++++++-
.../apache/spark/streaming/JavaTestUtils.scala | 36 +-
.../spark/streaming/BasicOperationsSuite.scala | 141 ++-
.../spark/streaming/CheckpointSuite.scala | 4 +-
.../spark/streaming/InputStreamsSuite.scala | 91 +-
.../apache/spark/streaming/TestSuiteBase.scala | 61 +-
.../tools/JavaAPICompletenessChecker.scala | 4 +-
yarn/pom.xml | 50 +
.../spark/deploy/yarn/ApplicationMaster.scala | 55 +-
.../org/apache/spark/deploy/yarn/Client.scala | 258 ++++-
.../spark/deploy/yarn/ClientArguments.scala | 25 +-
.../yarn/ClientDistributedCacheManager.scala | 228 ++++
.../spark/deploy/yarn/WorkerRunnable.scala | 83 +-
.../deploy/yarn/YarnAllocationHandler.scala | 4 +-
.../ClientDistributedCacheManagerSuite.scala | 220 ++++
272 files changed, 11974 insertions(+), 5805 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6590e97,035942a..a686b53
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@@ -60,9 -60,9 +60,9 @@@ private[spark] class MapOutputTracker e
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: ActorRef = _
+ var trackerActor: Either[ActorRef, ActorSelection] = _
- private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
+ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
@@@ -77,13 -74,9 +74,13 @@@
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
- def askTracker(message: Any): Any = {
+ private def askTracker(message: Any): Any = {
try {
- val future = trackerActor.ask(message)(timeout)
+ val future = if (trackerActor.isLeft ) {
+ trackerActor.left.get.ask(message)(timeout)
+ } else {
+ trackerActor.right.get.ask(message)(timeout)
+ }
return Await.result(future, timeout)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 000d1ee,572fc34..070f10f
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@@ -45,30 -47,72 +47,71 @@@ private[spark] class Client
listener: ClientListener)
extends Logging {
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
+
++ var prevMaster: ActorRef = null // set for unwatching, when it fails.
var actor: ActorRef = null
var appId: String = null
+ var registered = false
+ var activeMasterUrl: String = null
class ClientActor extends Actor with Logging {
- var master: ActorRef = null
- var masterAddress: Address = null
+ var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
+ var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
- logInfo("Connecting to master " + masterUrl)
try {
- master = context.actorSelection(Master.toAkkaUrl(masterUrl))
- master ! RegisterApplication(appDescription)
+ registerWithMaster()
} catch {
case e: Exception =>
- logError("Failed to connect to master", e)
+ logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
+ def tryRegisterAllMasters() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
++ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterApplication(appDescription)
+ }
+ }
+
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ import context.dispatcher
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ markDead()
+ } else {
+ tryRegisterAllMasters()
+ }
+ }
+ retryTimer // start timer
+ }
+
+ def changeMaster(url: String) {
+ activeMasterUrl = url
- master = context.actorFor(Master.toAkkaUrl(url))
- masterAddress = master.path.address
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
++ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ }
+
override def receive = {
- case RegisteredApplication(appId_) =>
+ case RegisteredApplication(appId_, masterUrl) =>
+ context.watch(sender)
++ prevMaster = sender
appId = appId_
+ registered = true
+ changeMaster(masterUrl)
listener.connected(appId)
case ApplicationRemoved(message) =>
@@@ -89,13 -133,27 +132,19 @@@
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
++ context.unwatch(prevMaster)
+ changeMaster(masterUrl)
+ alreadyDisconnected = false
+ sender ! MasterChangeAcknowledged(appId)
+
- case Terminated(actor_) if actor_ == master =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case DisassociatedEvent(_, address, _) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
- markDisconnected()
-
- case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
- logWarning("Connection to master failed; waiting for master to reconnect...")
+ case Terminated(actor_) =>
- logError(s"Connection to $actor_ dropped, stopping client")
++ logWarning(s"Connection to $actor_ failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case StopClient =>
- markDisconnected()
+ markDead()
sender ! true
context.stop(self)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index cb0fe6a,7db5097..a7cfc25
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@@ -94,11 -100,11 +100,10 @@@ private[spark] class Master(host: Strin
val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
- logInfo("Starting Spark master at spark://" + host + ":" + port)
+ logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
-- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
- import context.dispatcher
+ masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
@@@ -113,27 -144,56 +143,56 @@@
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+ case ElectedLeader => {
+ val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+ state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+ RecoveryState.ALIVE
+ else
+ RecoveryState.RECOVERING
+
+ logInfo("I have been elected leader! New state: " + state)
+
+ if (state == RecoveryState.RECOVERING) {
+ beginRecovery(storedApps, storedWorkers)
+ context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
+ }
+ }
+
+ case RevokedLeadership => {
+ logError("Leadership has been revoked -- master shutting down.")
+ System.exit(0)
+ }
+
+ case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
- if (idToWorker.contains(id)) {
+ if (state == RecoveryState.STANDBY) {
+ // ignore, don't send response
+ } else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ registerWorker(worker)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
++ context.watch(sender)
+ persistenceEngine.addWorker(worker)
+ sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
}
case RegisterApplication(description) => {
- logInfo("Registering app " + description.name)
- val app = addApplication(description, sender)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- waitingApps += app
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredApplication(app.id)
- schedule()
+ if (state == RecoveryState.STANDBY) {
+ // ignore, don't send response
+ } else {
+ logInfo("Registering app " + description.name)
+ val app = createApplication(description, sender)
+ registerApplication(app)
+ logInfo("Registered app " + description.name + " with ID " + app.id)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
++ context.watch(sender)
+ persistenceEngine.addApplication(app)
+ sender ! RegisteredApplication(app.id, masterUrl)
+ schedule()
+ }
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
@@@ -178,22 -270,26 +269,12 @@@
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
- }
-
- case DisassociatedEvent(_, address, _) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- }
-
- case AssociationErrorEvent(_, _, address, _) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
- case DisassociatedEvent(_, address, _) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
- case AssociationErrorEvent(_, _, address, _) => {
- // The disconnected client could've been either a worker or an app; remove whichever it was
- addressToWorker.get(address).foreach(removeWorker)
- addressToApp.get(address).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+ state)
}
case CheckForWorkerTimeOut => {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 400d6f2,07189ac..9472c9a
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@@ -25,27 -25,19 +25,27 @@@ import scala.collection.mutable.HashMa
import scala.concurrent.duration._
import akka.actor._
- import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
-
-import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
-
import org.apache.spark.Logging
- import org.apache.spark.deploy.ExecutorState
+ import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import scala.Some
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import akka.remote.DisassociatedEvent
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
-
+ /**
+ * @param masterUrls Each url should look like spark://host:port.
+ */
private[spark] class Worker(
host: String,
port: Int,
@@@ -64,8 -57,18 +65,19 @@@
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
+
+ // Index into masterUrls that we're currently trying to register with.
+ var masterIndex = 0
+
+ val masterLock: Object = new Object()
- var master: ActorRef = null
+ var master: ActorSelection = null
- var masterWebUiUrl : String = ""
++ var prevMaster: ActorRef = null
+ var activeMasterUrl: String = ""
+ var activeMasterWebUiUrl : String = ""
+ @volatile var registered = false
+ @volatile var connected = false
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
@@@ -119,40 -123,92 +132,93 @@@
metricsSystem.start()
}
- def connectToMaster() {
- logInfo("Connecting to master " + masterUrl)
- master = context.actorSelection(Master.toAkkaUrl(masterUrl))
- master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
+ def changeMaster(url: String, uiUrl: String) {
+ masterLock.synchronized {
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
- master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
- context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
++ master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ connected = true
+ }
}
- import context.dispatcher
+ def tryRegisterAllMasters() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
++ val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
+ publicAddress)
+ }
+ }
+
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ System.exit(1)
+ } else {
+ tryRegisterAllMasters()
+ }
+ }
+ retryTimer // start timer
+ }
override def receive = {
- case RegisteredWorker(url) =>
- masterWebUiUrl = url
- logInfo("Successfully registered with master")
+ case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+ logInfo("Successfully registered with master " + masterUrl)
+ registered = true
+ context.watch(sender) // remote death watch for master
- //TODO: Is heartbeat really necessary akka does it anyway !
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
- master ! Heartbeat(workerId)
++ prevMaster = sender
+ changeMaster(masterUrl, masterWebUiUrl)
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+
+ case SendHeartbeat =>
+ masterLock.synchronized {
+ if (connected) { master ! Heartbeat(workerId) }
}
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(master)
++ context.unwatch(prevMaster)
++ prevMaster = sender
+ changeMaster(masterUrl, masterWebUiUrl)
+
+ val execs = executors.values.
+ map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+ sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+
case RegisterWorkerFailed(message) =>
- logError("Worker registration failed: " + message)
- System.exit(1)
-
- case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- val manager = new ExecutorRunner(
- appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+ if (!registered) {
+ logError("Worker registration failed: " + message)
+ System.exit(1)
+ }
+
+ case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+ } else {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
+ }
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ }
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
@@@ -165,18 -221,21 +231,22 @@@
memoryUsed -= executor.memory
}
- case KillExecutor(appId, execId) =>
- val fullId = appId + "/" + execId
- executors.get(fullId) match {
- case Some(executor) =>
- logInfo("Asked to kill executor " + fullId)
- executor.kill()
- case None =>
- logInfo("Asked to kill unknown executor " + fullId)
+ case KillExecutor(masterUrl, appId, execId) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
+ } else {
+ val fullId = appId + "/" + execId
+ executors.get(fullId) match {
+ case Some(executor) =>
+ logInfo("Asked to kill executor " + fullId)
+ executor.kill()
+ case None =>
+ logInfo("Asked to kill unknown executor " + fullId)
+ }
}
- case DisassociatedEvent(_, address, _) if address == master.path.address =>
+ case Terminated(actor_) =>
+ logInfo(s"$actor_ terminated !")
masterDisconnected()
case RequestWorkerState => {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 0000000,50302fc..16d8f81
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -1,0 -1,126 +1,126 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.executor
+
+ import java.nio.ByteBuffer
+
+ import akka.actor._
+ import akka.remote._
+
+ import org.apache.spark.{Logging, SparkEnv}
+ import org.apache.spark.TaskState.TaskState
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.{Utils, AkkaUtils}
+ import akka.remote.DisassociatedEvent
+ import akka.remote.AssociationErrorEvent
+ import akka.remote.DisassociatedEvent
+ import akka.actor.Terminated
+
+
+ private[spark] class CoarseGrainedExecutorBackend(
+ driverUrl: String,
+ executorId: String,
+ hostPort: String,
+ cores: Int)
+ extends Actor
+ with ExecutorBackend
+ with Logging {
+
+ Utils.checkHostPort(hostPort, "Expected hostport")
+
+ var executor: Executor = null
+ var driver: ActorSelection = null
+
+ override def preStart() {
+ logInfo("Connecting to driver: " + driverUrl)
+ driver = context.actorSelection(driverUrl)
+ driver ! RegisterExecutor(executorId, hostPort, cores)
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- // context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ }
+
+ override def receive = {
+ case RegisteredExecutor(sparkProperties) =>
+ logInfo("Successfully registered with driver")
++ context.watch(sender) //Start watching for terminated messages.
+ // Make this host instead of hostPort ?
+ executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
+
+ case RegisterExecutorFailed(message) =>
+ logError("Slave registration failed: " + message)
+ System.exit(1)
+
+ case LaunchTask(taskDesc) =>
+ logInfo("Got assigned task " + taskDesc.taskId)
+ if (executor == null) {
+ logError("Received LaunchTask command but executor was null")
+ System.exit(1)
+ } else {
+ executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+ }
+
+ case KillTask(taskId, _) =>
+ if (executor == null) {
+ logError("Received KillTask command but executor was null")
+ System.exit(1)
+ } else {
+ executor.killTask(taskId)
+ }
+
- case DisassociatedEvent(_, _, _) =>
- logError("Driver terminated or disconnected! Shutting down.")
++ case Terminated(actor) =>
++ logError(s"Driver $actor terminated or disconnected! Shutting down.")
+ System.exit(1)
+
+ case StopExecutor =>
+ logInfo("Driver commanded a shutdown")
+ context.stop(self)
+ context.system.shutdown()
+ }
+
+ override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+ driver ! StatusUpdate(executorId, taskId, state, data)
+ }
+ }
+
+ private[spark] object CoarseGrainedExecutorBackend {
+ def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+ // Debug code
+ Utils.checkHost(hostname)
+
+ // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
+ // before getting started with all our system properties, etc
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ // set it
+ val sparkHostPort = hostname + ":" + boundPort
+ System.setProperty("spark.hostPort", sparkHostPort)
+ actorSystem.actorOf(
+ Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
+ name = "Executor")
+ actorSystem.awaitTermination()
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ //the reason we allow the last appid argument is to make it easy to kill rogue executors
+ System.err.println(
+ "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " +
+ "[<appid>]")
+ System.exit(1)
+ }
+ run(args(0), args(1), args(2), args(3).toInt)
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0000000,3ccc38d..03cf1e2
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@@ -1,0 -1,238 +1,233 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.scheduler.cluster
+
+ import java.util.concurrent.atomic.AtomicInteger
+
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+ import scala.concurrent.Await
+ import scala.concurrent.duration._
+
+ import akka.actor._
+ import akka.pattern.ask
+ import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
+ import org.apache.spark.{SparkException, Logging, TaskState}
+ import org.apache.spark.scheduler.TaskDescription
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.Utils
+
+ /**
+ * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
+ * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
+ * executors whenever a task is done and asking the scheduler to launch a new executor for
+ * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
+ * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
+ * (spark.deploy.*).
+ */
+ private[spark]
+ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+ extends SchedulerBackend with Logging
+ {
+ // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+ var totalCoreCount = new AtomicInteger(0)
+
+ class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+ private val executorActor = new HashMap[String, ActorRef]
+ private val executorAddress = new HashMap[String, Address]
+ private val executorHost = new HashMap[String, String]
+ private val freeCores = new HashMap[String, Int]
+ private val actorToExecutorId = new HashMap[ActorRef, String]
+ private val addressToExecutorId = new HashMap[Address, String]
+
+ override def preStart() {
+ // Listen for remote client disconnection events, since they don't go through Akka's watch()
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+
+ // Periodically revive offers to allow delay scheduling to work
+ val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ import context.dispatcher
+ context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
+ }
+
+ def receive = {
+ case RegisterExecutor(executorId, hostPort, cores) =>
+ Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
+ if (executorActor.contains(executorId)) {
+ sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
+ } else {
+ logInfo("Registered executor: " + sender + " with ID " + executorId)
+ sender ! RegisteredExecutor(sparkProperties)
+ context.watch(sender)
+ executorActor(executorId) = sender
+ executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+ freeCores(executorId) = cores
+ executorAddress(executorId) = sender.path.address
+ actorToExecutorId(sender) = executorId
+ addressToExecutorId(sender.path.address) = executorId
+ totalCoreCount.addAndGet(cores)
+ makeOffers()
+ }
+
+ case StatusUpdate(executorId, taskId, state, data) =>
+ scheduler.statusUpdate(taskId, state, data.value)
+ if (TaskState.isFinished(state)) {
+ if (executorActor.contains(executorId)) {
+ freeCores(executorId) += 1
+ makeOffers(executorId)
+ } else {
+ // Ignoring the update since we don't know about the executor.
+ val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
+ logWarning(msg.format(taskId, state, sender, executorId))
+ }
+ }
+
+ case ReviveOffers =>
+ makeOffers()
+
+ case KillTask(taskId, executorId) =>
+ executorActor(executorId) ! KillTask(taskId, executorId)
+
+ case StopDriver =>
+ sender ! true
+ context.stop(self)
+
+ case StopExecutors =>
+ logInfo("Asking each executor to shut down")
+ for (executor <- executorActor.values) {
+ executor ! StopExecutor
+ }
+ sender ! true
+
+ case RemoveExecutor(executorId, reason) =>
+ removeExecutor(executorId, reason)
+ sender ! true
+
+ case Terminated(actor) =>
+ actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+
- case DisassociatedEvent(_, remoteAddress, _) =>
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
-
- case AssociationErrorEvent(_, _, remoteAddress, _) =>
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
+ }
+
+ // Make fake resource offers on all executors
+ def makeOffers() {
+ launchTasks(scheduler.resourceOffers(
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ }
+
+ // Make fake resource offers on just one executor
+ def makeOffers(executorId: String) {
+ launchTasks(scheduler.resourceOffers(
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+ }
+
+ // Launch tasks returned by a set of resource offers
+ def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+ for (task <- tasks.flatten) {
+ freeCores(task.executorId) -= 1
+ executorActor(task.executorId) ! LaunchTask(task)
+ }
+ }
+
+ // Remove a disconnected slave from the cluster
+ def removeExecutor(executorId: String, reason: String) {
+ if (executorActor.contains(executorId)) {
+ logInfo("Executor " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ totalCoreCount.addAndGet(-numCores)
+ scheduler.executorLost(executorId, SlaveLost(reason))
+ }
+ }
+ }
+
+ var driverActor: ActorRef = null
+ val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+ override def start() {
+ val properties = new ArrayBuffer[(String, String)]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
+ properties += ((key, value))
+ }
+ }
+ driverActor = actorSystem.actorOf(
+ Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ }
+
+ private val timeout = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ }
+
+ def stopExecutors() {
+ try {
+ if (driverActor != null) {
+ logInfo("Shutting down all executors")
+ val future = driverActor.ask(StopExecutors)(timeout)
+ Await.ready(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error asking standalone scheduler to shut down executors", e)
+ }
+ }
+
+ override def stop() {
+ try {
+ if (driverActor != null) {
+ val future = driverActor.ask(StopDriver)(timeout)
+ Await.ready(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error stopping standalone scheduler's driver actor", e)
+ }
+ }
+
+ override def reviveOffers() {
+ driverActor ! ReviveOffers
+ }
+
+ override def killTask(taskId: Long, executorId: String) {
+ driverActor ! KillTask(taskId, executorId)
+ }
+
+ override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+
+ // Called by subclasses when notified of a lost worker
+ def removeExecutor(executorId: String, reason: String) {
+ try {
+ val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+ Await.ready(future, timeout)
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+ }
+ }
+ }
+
+ private[spark] object CoarseGrainedSchedulerBackend {
+ val ACTOR_NAME = "CoarseGrainedScheduler"
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 955f6cd,fd17460..271dc90
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@@ -48,15 -48,15 +48,15 @@@ class MapOutputTrackerSuite extends Fun
test("master start and stop") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTracker()
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+ val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTracker()
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+ val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@@ -74,8 -74,8 +74,8 @@@
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
- val tracker = new MapOutputTracker()
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerActor(tracker))))
+ val tracker = new MapOutputTrackerMaster()
- tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
++ tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@@ -102,9 -100,9 +100,9 @@@
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
- val masterTracker = new MapOutputTracker()
+ val masterTracker = new MapOutputTrackerMaster()
- masterTracker.trackerActor = actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+ masterTracker.trackerActor = Left(actorSystem.actorOf(
- Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker"))
++ Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/199e9cf0/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------