You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/12 08:12:18 UTC

[47/50] git commit: Merge branch 'master' into akka-bug-fix

Merge branch 'master' into akka-bug-fix

Conflicts:
	core/pom.xml
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	pom.xml
	project/SparkBuild.scala
	streaming/pom.xml
	yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/603af51b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/603af51b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/603af51b

Branch: refs/heads/scala-2.10
Commit: 603af51bb5257744ce0db28e7f10db6a2ba899ec
Parents: 17db6a9 d2efe13
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Wed Dec 11 10:21:53 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Wed Dec 11 10:21:53 2013 +0530

----------------------------------------------------------------------
 README.md                                       |   8 +-
 core/pom.xml                                    |  12 +-
 .../scala/org/apache/spark/FutureAction.scala   |   2 +-
 .../org/apache/spark/MapOutputTracker.scala     |   8 +-
 .../scala/org/apache/spark/SparkContext.scala   | 236 +++----
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  10 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |  27 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 382 ++++++++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |   5 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |   1 +
 .../apache/spark/scheduler/SparkListener.scala  |   2 +-
 .../scheduler/cluster/ClusterScheduler.scala    |   8 +-
 .../cluster/ClusterTaskSetManager.scala         |   2 +-
 .../spark/scheduler/local/LocalScheduler.scala  |  29 +-
 .../spark/storage/ShuffleBlockManager.scala     |   2 +-
 .../org/apache/spark/storage/StorageLevel.scala |   2 +-
 .../spark/storage/StoragePerfTester.scala       |  17 +
 .../org/apache/spark/ui/jobs/StagePage.scala    |  65 +-
 .../org/apache/spark/JobCancellationSuite.scala |   4 +-
 .../SparkContextSchedulerCreationSuite.scala    | 140 ++++
 .../deploy/worker/ExecutorRunnerTest.scala      |  17 +
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |  26 +
 .../spark/scheduler/DAGSchedulerSuite.scala     |  43 +-
 .../spark/storage/DiskBlockManagerSuite.scala   |  31 +-
 .../util/collection/OpenHashMapSuite.scala      |  17 +
 .../util/collection/OpenHashSetSuite.scala      |  17 +
 .../PrimitiveKeyOpenHashMapSuite.scala          |  17 +
 docs/_layouts/global.html                       |   8 +-
 docs/bagel-programming-guide.md                 |   2 +-
 docs/building-with-maven.md                     |   6 +
 docs/cluster-overview.md                        |   2 +-
 docs/configuration.md                           |  36 +-
 docs/hadoop-third-party-distributions.md        |   3 +-
 docs/index.md                                   |   8 +-
 docs/job-scheduling.md                          |   2 +-
 docs/running-on-yarn.md                         |   8 +
 docs/spark-standalone.md                        |   4 +-
 docs/streaming-programming-guide.md             |   8 +-
 new-yarn/pom.xml                                | 161 +++++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 446 ++++++++++++
 .../yarn/ApplicationMasterArguments.scala       |  94 +++
 .../org/apache/spark/deploy/yarn/Client.scala   | 519 ++++++++++++++
 .../spark/deploy/yarn/ClientArguments.scala     | 148 ++++
 .../yarn/ClientDistributedCacheManager.scala    | 228 ++++++
 .../spark/deploy/yarn/WorkerLauncher.scala      | 223 ++++++
 .../spark/deploy/yarn/WorkerRunnable.scala      | 209 ++++++
 .../deploy/yarn/YarnAllocationHandler.scala     | 687 +++++++++++++++++++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  43 ++
 .../cluster/YarnClientClusterScheduler.scala    |  47 ++
 .../cluster/YarnClientSchedulerBackend.scala    | 109 +++
 .../cluster/YarnClusterScheduler.scala          |  55 ++
 .../ClientDistributedCacheManagerSuite.scala    | 220 ++++++
 pom.xml                                         |  61 +-
 project/SparkBuild.scala                        |  30 +-
 python/pyspark/rdd.py                           |   5 +-
 python/pyspark/tests.py                         |  15 +
 python/test_support/userlibrary.py              |  17 +
 repl-bin/src/deb/bin/spark-executor             |   2 +-
 repl-bin/src/deb/bin/spark-shell                |   2 +-
 streaming/pom.xml                               |   9 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   | 172 ++---
 .../org/apache/spark/deploy/yarn/Client.scala   | 151 ++--
 .../spark/deploy/yarn/WorkerRunnable.scala      |  85 ++-
 .../deploy/yarn/YarnAllocationHandler.scala     | 346 ++++++----
 64 files changed, 4656 insertions(+), 645 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/README.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/pom.xml
----------------------------------------------------------------------
diff --cc core/pom.xml
index 71bf15c,38f4be1..3fe48fd
--- a/core/pom.xml
+++ b/core/pom.xml
@@@ -95,12 -95,20 +95,16 @@@
        <version>0.3.1</version>
      </dependency>
      <dependency>
-       <groupId>com.typesafe.akka</groupId>
+       <groupId>${akka.group}</groupId>
 -      <artifactId>akka-actor</artifactId>
++      <artifactId>akka-actor_2.10</artifactId>
+     </dependency>
+     <dependency>
+       <groupId>${akka.group}</groupId>
 -      <artifactId>akka-remote</artifactId>
 +      <artifactId>akka-remote_2.10</artifactId>
      </dependency>
      <dependency>
-       <groupId>com.typesafe.akka</groupId>
+       <groupId>${akka.group}</groupId>
 -      <artifactId>akka-slf4j</artifactId>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.scala-lang</groupId>
 -      <artifactId>scalap</artifactId>
 +      <artifactId>akka-slf4j_2.10</artifactId>
      </dependency>
      <dependency>
        <groupId>org.scala-lang</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f80d3d6,893708f..ea45566
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@@ -938,9 -940,9 +938,9 @@@ abstract class RDD[T: ClassTag]
    private var storageLevel: StorageLevel = StorageLevel.NONE
  
    /** Record user function generating this RDD. */
-   private[spark] val origin = Utils.formatSparkCallSite
+   @transient private[spark] val origin = Utils.formatSparkCallSite
  
 -  private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
 +  private[spark] def elementClassTag: ClassTag[T] = classTag[T]
  
    private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 201572d,f9cd021..963d15b
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -180,6 -158,56 +160,57 @@@ class DAGScheduler
  
    val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
  
+   /**
+    * Starts the event processing actor.  The actor has two responsibilities:
+    *
+    * 1. Waits for events like job submission, task finished, task failure etc., and calls
+    *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+    * 2. Schedules a periodical task to resubmit failed stages.
+    *
+    * NOTE: the actor cannot be started in the constructor, because the periodical task references
+    * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+    * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+    */
+   def start() {
+     eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+       /**
+        * A handle to the periodical task, used to cancel the task when the actor is stopped.
+        */
+       var resubmissionTask: Cancellable = _
+ 
+       override def preStart() {
++        import context.dispatcher
+         /**
+          * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+          * stages.  In this way, stage resubmission can be done within the same thread context of
+          * other event processing logic to avoid unnecessary synchronization overhead.
+          */
+         resubmissionTask = context.system.scheduler.schedule(
 -          RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
++          RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
+       }
+ 
+       /**
+        * The main event loop of the DAG scheduler.
+        */
+       def receive = {
+         case event: DAGSchedulerEvent =>
+           logDebug("Got event of type " + event.getClass.getName)
+ 
+           /**
+            * All events are forwarded to `processEvent()`, so that the event processing logic can
+            * easily tested without starting a dedicated actor.  Please refer to `DAGSchedulerSuite`
+            * for details.
+            */
+           if (!processEvent(event)) {
+             submitWaitingStages()
+           } else {
+             resubmissionTask.cancel()
+             context.stop(self)
+           }
+       }
+     }))
+   }
+ 
    def addSparkListener(listener: SparkListener) {
      listenerBus.addListener(listener)
    }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/docs/configuration.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 979fd0c,9348c77..6906ad2
--- a/pom.xml
+++ b/pom.xml
@@@ -99,11 -99,12 +99,13 @@@
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  
 -    <java.version>1.5</java.version>
 -    <scala.version>2.9.3</scala.version>
 +    <java.version>1.6</java.version>
 +
 +    <scala.version>2.10.3</scala.version>
      <mesos.version>0.13.0</mesos.version>
 +    <akka.version>2.2.3</akka.version>
+     <akka.group>com.typesafe.akka</akka.group>
 -    <akka.version>2.0.5</akka.version>
+     <protobuf.version>2.4.1</protobuf.version>
      <slf4j.version>1.7.2</slf4j.version>
      <log4j.version>1.2.17</log4j.version>
      <hadoop.version>1.0.4</hadoop.version>
@@@ -213,8 -260,8 +215,8 @@@
          <version>0.3.1</version>
        </dependency>
        <dependency>
-         <groupId>com.typesafe.akka</groupId>
+         <groupId>${akka.group}</groupId>
 -        <artifactId>akka-actor</artifactId>
 +        <artifactId>akka-actor_2.10</artifactId>
          <version>${akka.version}</version>
          <exclusions>
            <exclusion>
@@@ -224,8 -271,8 +226,8 @@@
          </exclusions>
        </dependency>
        <dependency>
-         <groupId>com.typesafe.akka</groupId>
+         <groupId>${akka.group}</groupId>
 -        <artifactId>akka-remote</artifactId>
 +        <artifactId>akka-remote_2.10</artifactId>
          <version>${akka.version}</version>
          <exclusions>
            <exclusion>
@@@ -235,8 -282,8 +237,8 @@@
          </exclusions>
        </dependency>
        <dependency>
-         <groupId>com.typesafe.akka</groupId>
+         <groupId>${akka.group}</groupId>
 -        <artifactId>akka-slf4j</artifactId>
 +        <artifactId>akka-slf4j_2.10</artifactId>
          <version>${akka.version}</version>
          <exclusions>
            <exclusion>
@@@ -246,6 -293,17 +248,17 @@@
          </exclusions>
        </dependency>
        <dependency>
+         <groupId>${akka.group}</groupId>
 -        <artifactId>akka-zeromq</artifactId>
++        <artifactId>akka-zeromq_2.10</artifactId>
+         <version>${akka.version}</version>
+         <exclusions>
+           <exclusion>
+             <groupId>org.jboss.netty</groupId>
+             <artifactId>netty</artifactId>
+           </exclusion>
+         </exclusions>
+       </dependency>
+       <dependency>
          <groupId>it.unimi.dsi</groupId>
          <artifactId>fastutil</artifactId>
          <version>6.4.4</version>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/project/SparkBuild.scala
----------------------------------------------------------------------
diff --cc project/SparkBuild.scala
index 3584e88,ac87cff..ea7bf96
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@@ -200,36 -218,38 +216,36 @@@ object SparkBuild extends Build 
      ),
  
      libraryDependencies ++= Seq(
 -      "com.google.guava" % "guava" % "14.0.1",
 -      "com.google.code.findbugs" % "jsr305" % "1.3.9",
 -      "log4j" % "log4j" % "1.2.17",
 -      "org.slf4j" % "slf4j-api" % slf4jVersion,
 -      "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
 -      "commons-daemon" % "commons-daemon" % "1.0.10",  // workaround for bug HADOOP-9407
 -      "com.ning" % "compress-lzf" % "0.8.4",
 -      "org.xerial.snappy" % "snappy-java" % "1.0.5",
 -      "org.ow2.asm" % "asm" % "4.0",
 -      "com.google.protobuf" % "protobuf-java" % protobufVersion,
 -      akkaGroup % "akka-actor" % akkaVersion excludeAll(excludeNetty),
 -      akkaGroup % "akka-remote" % akkaVersion excludeAll(excludeNetty),
 -      akkaGroup % "akka-slf4j" % akkaVersion excludeAll(excludeNetty),
 -      "it.unimi.dsi" % "fastutil" % "6.4.4",
 -      "colt" % "colt" % "1.2.0",
 -      "net.liftweb" % "lift-json_2.9.2" % "2.5",
 -      "org.apache.mesos" % "mesos" % "0.13.0",
 -      "io.netty" % "netty-all" % "4.0.0.Beta2",
 -      "org.apache.derby" % "derby" % "10.4.2.0" % "test",
 -      "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
 -      "net.java.dev.jets3t" % "jets3t" % "0.7.1",
 -      "org.apache.avro" % "avro" % "1.7.4",
 -      "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
 -      "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
 -      "com.codahale.metrics" % "metrics-core" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-json" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-graphite" % "3.0.0",
 -      "com.twitter" % "chill_2.9.3" % "0.3.1",
 -      "com.twitter" % "chill-java" % "0.3.1"
 -    )
 +        "com.google.guava"         % "guava"            % "14.0.1",
 +        "com.google.code.findbugs" % "jsr305"           % "1.3.9",
 +        "log4j"                    % "log4j"            % "1.2.17",
 +        "org.slf4j"                % "slf4j-api"        % slf4jVersion,
 +        "org.slf4j"                % "slf4j-log4j12"    % slf4jVersion,
 +        "commons-daemon"           % "commons-daemon"   % "1.0.10", // workaround for bug HADOOP-9407
 +        "com.ning"                 % "compress-lzf"     % "0.8.4",
 +        "org.xerial.snappy"        % "snappy-java"      % "1.0.5",
 +        "org.ow2.asm"              % "asm"              % "4.0",
 +        "com.google.protobuf"      % "protobuf-java"    % "2.4.1",
-         "com.typesafe.akka"       %% "akka-remote"      % "2.2.3"  excludeAll(excludeNetty), 
-         "com.typesafe.akka"       %% "akka-slf4j"       % "2.2.3"  excludeAll(excludeNetty),
++        akkaGroup                 %% "akka-remote"      % "2.2.3"  excludeAll(excludeNetty),
++        akkaGroup                 %% "akka-slf4j"       % "2.2.3"  excludeAll(excludeNetty),
 +        "net.liftweb"             %% "lift-json"        % "2.5.1"  excludeAll(excludeNetty),
 +        "it.unimi.dsi"             % "fastutil"         % "6.4.4",
 +        "colt"                     % "colt"             % "1.2.0",
 +        "org.apache.mesos"         % "mesos"            % "0.13.0",
 +        "net.java.dev.jets3t"      % "jets3t"           % "0.7.1",
 +        "org.apache.derby"         % "derby"            % "10.4.2.0"                     % "test",
 +        "org.apache.hadoop"        % "hadoop-client"    % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
 +        "org.apache.avro"          % "avro"             % "1.7.4",
 +        "org.apache.avro"          % "avro-ipc"         % "1.7.4" excludeAll(excludeNetty),
 +        "org.apache.zookeeper"     % "zookeeper"        % "3.4.5" excludeAll(excludeNetty),
 +        "com.codahale.metrics"     % "metrics-core"     % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-jvm"      % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-json"     % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-graphite" % "3.0.0",
 +        "com.twitter"             %% "chill"            % "0.3.1",
 +        "com.twitter"              % "chill-java"       % "0.3.1"
 +      )
    )
  
    def rootSettings = sharedSettings ++ Seq(
@@@ -291,11 -311,6 +307,11 @@@
          exclude("com.sun.jdmk", "jmxtools")
          exclude("com.sun.jmx", "jmxri")
          exclude("net.sf.jopt-simple", "jopt-simple")
 +        excludeAll(excludeNetty),
 +      "org.eclipse.paho"      % "mqtt-client"      % "0.4.0",
 +      "com.github.sgroschupf" % "zkclient"         % "0.1"                excludeAll(excludeNetty),
 +      "org.twitter4j"         % "twitter4j-stream" % "3.0.3"              excludeAll(excludeNetty),
-       "com.typesafe.akka"    %%  "akka-zeromq"     % "2.2.3"              excludeAll(excludeNetty)
++      akkaGroup              %% "akka-zeromq"      % "2.2.3"              excludeAll(excludeNetty)
      )
    )
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/python/pyspark/rdd.py
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/streaming/pom.xml
----------------------------------------------------------------------
diff --cc streaming/pom.xml
index 298bc83,4089293..e27b437
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@@ -110,15 -110,8 +110,8 @@@
        <artifactId>scala-library</artifactId>
      </dependency>
      <dependency>
-       <groupId>com.typesafe.akka</groupId>
+       <groupId>${akka.group}</groupId>
 -      <artifactId>akka-zeromq</artifactId>
 +      <artifactId>akka-zeromq_2.10</artifactId>
-       <version>${akka.version}</version>
-       <exclusions>
-         <exclusion>
-           <groupId>org.jboss.netty</groupId>
-           <artifactId>netty</artifactId>
-         </exclusion>
-       </exclusions>
      </dependency>
      <dependency>
        <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/603af51b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a6ce1b6,f15f3c7..9ab2073
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@@ -17,24 -17,30 +17,30 @@@
  
  package org.apache.spark.deploy.yarn
  
+ import java.lang.{Boolean => JBoolean}
+ import java.util.{Collections, Set => JSet}
+ import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+ import java.util.concurrent.atomic.AtomicInteger
+ 
+ import scala.collection
+ import scala.collection.JavaConversions._
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+ 
  import org.apache.spark.Logging
- import org.apache.spark.util.Utils
  import org.apache.spark.scheduler.SplitInfo
- import scala.collection
- import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
  import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+ import org.apache.spark.util.Utils
+ 
+ import org.apache.hadoop.conf.Configuration
+ import org.apache.hadoop.yarn.api.AMRMProtocol
+ import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
+ import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus}
+ import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
  import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
  import org.apache.hadoop.yarn.util.{RackResolver, Records}
- import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
- import java.util.concurrent.atomic.AtomicInteger
- import org.apache.hadoop.yarn.api.AMRMProtocol
- import collection.JavaConversions._
- import collection.mutable.{ArrayBuffer, HashMap, HashSet}
- import org.apache.hadoop.conf.Configuration
- import java.util.{Collections, Set => JSet}
- import java.lang.{Boolean => JBoolean}
+ 
  
 -object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
 +object AllocationType extends Enumeration {
    type AllocationType = Value
    val HOST, RACK, ANY = Value
  }
@@@ -209,9 -235,10 +235,10 @@@ private[yarn] class YarnAllocationHandl
            numWorkersRunning.decrementAndGet()
          }
          else {
-           // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
+           // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
+           // (workerIdCounter)
            val workerId = workerIdCounter.incrementAndGet().toString
 -          val driverUrl = "akka://spark@%s:%s/user/%s".format(
 +          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
              System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
              CoarseGrainedSchedulerBackend.ACTOR_NAME)