You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/12 08:12:18 UTC
[47/50] git commit: Merge branch 'master' into akka-bug-fix
Merge branch 'master' into akka-bug-fix
Conflicts:
core/pom.xml
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
pom.xml
project/SparkBuild.scala
streaming/pom.xml
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/603af51b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/603af51b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/603af51b
Branch: refs/heads/scala-2.10
Commit: 603af51bb5257744ce0db28e7f10db6a2ba899ec
Parents: 17db6a9 d2efe13
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Wed Dec 11 10:21:53 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Wed Dec 11 10:21:53 2013 +0530
----------------------------------------------------------------------
README.md | 8 +-
core/pom.xml | 12 +-
.../scala/org/apache/spark/FutureAction.scala | 2 +-
.../org/apache/spark/MapOutputTracker.scala | 8 +-
.../scala/org/apache/spark/SparkContext.scala | 236 +++----
.../main/scala/org/apache/spark/rdd/RDD.scala | 10 +-
.../apache/spark/rdd/ZippedPartitionsRDD.scala | 27 +-
.../apache/spark/scheduler/DAGScheduler.scala | 382 ++++++++---
.../spark/scheduler/DAGSchedulerEvent.scala | 5 +-
.../org/apache/spark/scheduler/JobWaiter.scala | 1 +
.../apache/spark/scheduler/SparkListener.scala | 2 +-
.../scheduler/cluster/ClusterScheduler.scala | 8 +-
.../cluster/ClusterTaskSetManager.scala | 2 +-
.../spark/scheduler/local/LocalScheduler.scala | 29 +-
.../spark/storage/ShuffleBlockManager.scala | 2 +-
.../org/apache/spark/storage/StorageLevel.scala | 2 +-
.../spark/storage/StoragePerfTester.scala | 17 +
.../org/apache/spark/ui/jobs/StagePage.scala | 65 +-
.../org/apache/spark/JobCancellationSuite.scala | 4 +-
.../SparkContextSchedulerCreationSuite.scala | 140 ++++
.../deploy/worker/ExecutorRunnerTest.scala | 17 +
.../apache/spark/rdd/AsyncRDDActionsSuite.scala | 26 +
.../spark/scheduler/DAGSchedulerSuite.scala | 43 +-
.../spark/storage/DiskBlockManagerSuite.scala | 31 +-
.../util/collection/OpenHashMapSuite.scala | 17 +
.../util/collection/OpenHashSetSuite.scala | 17 +
.../PrimitiveKeyOpenHashMapSuite.scala | 17 +
docs/_layouts/global.html | 8 +-
docs/bagel-programming-guide.md | 2 +-
docs/building-with-maven.md | 6 +
docs/cluster-overview.md | 2 +-
docs/configuration.md | 36 +-
docs/hadoop-third-party-distributions.md | 3 +-
docs/index.md | 8 +-
docs/job-scheduling.md | 2 +-
docs/running-on-yarn.md | 8 +
docs/spark-standalone.md | 4 +-
docs/streaming-programming-guide.md | 8 +-
new-yarn/pom.xml | 161 +++++
.../spark/deploy/yarn/ApplicationMaster.scala | 446 ++++++++++++
.../yarn/ApplicationMasterArguments.scala | 94 +++
.../org/apache/spark/deploy/yarn/Client.scala | 519 ++++++++++++++
.../spark/deploy/yarn/ClientArguments.scala | 148 ++++
.../yarn/ClientDistributedCacheManager.scala | 228 ++++++
.../spark/deploy/yarn/WorkerLauncher.scala | 223 ++++++
.../spark/deploy/yarn/WorkerRunnable.scala | 209 ++++++
.../deploy/yarn/YarnAllocationHandler.scala | 687 +++++++++++++++++++
.../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 43 ++
.../cluster/YarnClientClusterScheduler.scala | 47 ++
.../cluster/YarnClientSchedulerBackend.scala | 109 +++
.../cluster/YarnClusterScheduler.scala | 55 ++
.../ClientDistributedCacheManagerSuite.scala | 220 ++++++
pom.xml | 61 +-
project/SparkBuild.scala | 30 +-
python/pyspark/rdd.py | 5 +-
python/pyspark/tests.py | 15 +
python/test_support/userlibrary.py | 17 +
repl-bin/src/deb/bin/spark-executor | 2 +-
repl-bin/src/deb/bin/spark-shell | 2 +-
streaming/pom.xml | 9 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 172 ++---
.../org/apache/spark/deploy/yarn/Client.scala | 151 ++--
.../spark/deploy/yarn/WorkerRunnable.scala | 85 ++-
.../deploy/yarn/YarnAllocationHandler.scala | 346 ++++++----
64 files changed, 4656 insertions(+), 645 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/README.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/pom.xml
----------------------------------------------------------------------
diff --cc core/pom.xml
index 71bf15c,38f4be1..3fe48fd
--- a/core/pom.xml
+++ b/core/pom.xml
@@@ -95,12 -95,20 +95,16 @@@
<version>0.3.1</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-actor</artifactId>
++ <artifactId>akka-actor_2.10</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_2.10</artifactId>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scalap</artifactId>
+ <artifactId>akka-slf4j_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f80d3d6,893708f..ea45566
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@@ -938,9 -940,9 +938,9 @@@ abstract class RDD[T: ClassTag]
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Record user function generating this RDD. */
- private[spark] val origin = Utils.formatSparkCallSite
+ @transient private[spark] val origin = Utils.formatSparkCallSite
- private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
+ private[spark] def elementClassTag: ClassTag[T] = classTag[T]
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 201572d,f9cd021..963d15b
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -180,6 -158,56 +160,57 @@@ class DAGScheduler
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ /**
+ * Starts the event processing actor. The actor has two responsibilities:
+ *
+ * 1. Waits for events like job submission, task finished, task failure etc., and calls
+ * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+ * 2. Schedules a periodical task to resubmit failed stages.
+ *
+ * NOTE: the actor cannot be started in the constructor, because the periodical task references
+ * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+ * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+ */
+ def start() {
+ eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+ /**
+ * A handle to the periodical task, used to cancel the task when the actor is stopped.
+ */
+ var resubmissionTask: Cancellable = _
+
+ override def preStart() {
++ import context.dispatcher
+ /**
+ * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+ * stages. In this way, stage resubmission can be done within the same thread context of
+ * other event processing logic to avoid unnecessary synchronization overhead.
+ */
+ resubmissionTask = context.system.scheduler.schedule(
- RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
++ RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
+ }
+
+ /**
+ * The main event loop of the DAG scheduler.
+ */
+ def receive = {
+ case event: DAGSchedulerEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+
+ /**
+ * All events are forwarded to `processEvent()`, so that the event processing logic can
+ * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
+ * for details.
+ */
+ if (!processEvent(event)) {
+ submitWaitingStages()
+ } else {
+ resubmissionTask.cancel()
+ context.stop(self)
+ }
+ }
+ }))
+ }
+
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/docs/configuration.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 979fd0c,9348c77..6906ad2
--- a/pom.xml
+++ b/pom.xml
@@@ -99,11 -99,12 +99,13 @@@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.5</java.version>
- <scala.version>2.9.3</scala.version>
+ <java.version>1.6</java.version>
+
+ <scala.version>2.10.3</scala.version>
<mesos.version>0.13.0</mesos.version>
+ <akka.version>2.2.3</akka.version>
+ <akka.group>com.typesafe.akka</akka.group>
- <akka.version>2.0.5</akka.version>
+ <protobuf.version>2.4.1</protobuf.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.0.4</hadoop.version>
@@@ -213,8 -260,8 +215,8 @@@
<version>0.3.1</version>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-actor</artifactId>
+ <artifactId>akka-actor_2.10</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
@@@ -224,8 -271,8 +226,8 @@@
</exclusions>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_2.10</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
@@@ -235,8 -282,8 +237,8 @@@
</exclusions>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j</artifactId>
+ <artifactId>akka-slf4j_2.10</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
@@@ -246,6 -293,17 +248,17 @@@
</exclusions>
</dependency>
<dependency>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq</artifactId>
++ <artifactId>akka-zeromq_2.10</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>6.4.4</version>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/project/SparkBuild.scala
----------------------------------------------------------------------
diff --cc project/SparkBuild.scala
index 3584e88,ac87cff..ea7bf96
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@@ -200,36 -218,38 +216,36 @@@ object SparkBuild extends Build
),
libraryDependencies ++= Seq(
- "com.google.guava" % "guava" % "14.0.1",
- "com.google.code.findbugs" % "jsr305" % "1.3.9",
- "log4j" % "log4j" % "1.2.17",
- "org.slf4j" % "slf4j-api" % slf4jVersion,
- "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
- "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
- "com.ning" % "compress-lzf" % "0.8.4",
- "org.xerial.snappy" % "snappy-java" % "1.0.5",
- "org.ow2.asm" % "asm" % "4.0",
- "com.google.protobuf" % "protobuf-java" % protobufVersion,
- akkaGroup % "akka-actor" % akkaVersion excludeAll(excludeNetty),
- akkaGroup % "akka-remote" % akkaVersion excludeAll(excludeNetty),
- akkaGroup % "akka-slf4j" % akkaVersion excludeAll(excludeNetty),
- "it.unimi.dsi" % "fastutil" % "6.4.4",
- "colt" % "colt" % "1.2.0",
- "net.liftweb" % "lift-json_2.9.2" % "2.5",
- "org.apache.mesos" % "mesos" % "0.13.0",
- "io.netty" % "netty-all" % "4.0.0.Beta2",
- "org.apache.derby" % "derby" % "10.4.2.0" % "test",
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
- "net.java.dev.jets3t" % "jets3t" % "0.7.1",
- "org.apache.avro" % "avro" % "1.7.4",
- "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
- "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
- "com.codahale.metrics" % "metrics-core" % "3.0.0",
- "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
- "com.codahale.metrics" % "metrics-json" % "3.0.0",
- "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
- "com.codahale.metrics" % "metrics-graphite" % "3.0.0",
- "com.twitter" % "chill_2.9.3" % "0.3.1",
- "com.twitter" % "chill-java" % "0.3.1"
- )
+ "com.google.guava" % "guava" % "14.0.1",
+ "com.google.code.findbugs" % "jsr305" % "1.3.9",
+ "log4j" % "log4j" % "1.2.17",
+ "org.slf4j" % "slf4j-api" % slf4jVersion,
+ "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
+ "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
+ "com.ning" % "compress-lzf" % "0.8.4",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5",
+ "org.ow2.asm" % "asm" % "4.0",
+ "com.google.protobuf" % "protobuf-java" % "2.4.1",
- "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty),
++ akkaGroup %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
++ akkaGroup %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty),
+ "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
+ "it.unimi.dsi" % "fastutil" % "6.4.4",
+ "colt" % "colt" % "1.2.0",
+ "org.apache.mesos" % "mesos" % "0.13.0",
+ "net.java.dev.jets3t" % "jets3t" % "0.7.1",
+ "org.apache.derby" % "derby" % "10.4.2.0" % "test",
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.avro" % "avro" % "1.7.4",
+ "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
+ "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
+ "com.codahale.metrics" % "metrics-core" % "3.0.0",
+ "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+ "com.codahale.metrics" % "metrics-json" % "3.0.0",
+ "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
+ "com.codahale.metrics" % "metrics-graphite" % "3.0.0",
+ "com.twitter" %% "chill" % "0.3.1",
+ "com.twitter" % "chill-java" % "0.3.1"
+ )
)
def rootSettings = sharedSettings ++ Seq(
@@@ -291,11 -311,6 +307,11 @@@
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
+ excludeAll(excludeNetty),
+ "org.eclipse.paho" % "mqtt-client" % "0.4.0",
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty)
++ akkaGroup %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty)
)
)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/python/pyspark/rdd.py
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/streaming/pom.xml
----------------------------------------------------------------------
diff --cc streaming/pom.xml
index 298bc83,4089293..e27b437
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@@ -110,15 -110,8 +110,8 @@@
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
- <groupId>com.typesafe.akka</groupId>
+ <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq</artifactId>
+ <artifactId>akka-zeromq_2.10</artifactId>
- <version>${akka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a6ce1b6,f15f3c7..9ab2073
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -17,24 -17,30 +17,30 @@@
package org.apache.spark.deploy.yarn
+ import java.lang.{Boolean => JBoolean}
+ import java.util.{Collections, Set => JSet}
+ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+ import java.util.concurrent.atomic.AtomicInteger
+
+ import scala.collection
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
import org.apache.spark.Logging
- import org.apache.spark.util.Utils
import org.apache.spark.scheduler.SplitInfo
- import scala.collection
- import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+ import org.apache.spark.util.Utils
+
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.yarn.api.AMRMProtocol
+ import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+ import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+ import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
import org.apache.hadoop.yarn.util.{RackResolver, Records}
- import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
- import java.util.concurrent.atomic.AtomicInteger
- import org.apache.hadoop.yarn.api.AMRMProtocol
- import collection.JavaConversions._
- import collection.mutable.{ArrayBuffer, HashMap, HashSet}
- import org.apache.hadoop.conf.Configuration
- import java.util.{Collections, Set => JSet}
- import java.lang.{Boolean => JBoolean}
+
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
+object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}
@@@ -209,9 -235,10 +235,10 @@@ private[yarn] class YarnAllocationHandl
numWorkersRunning.decrementAndGet()
}
else {
- // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
+ // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+ // (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)