You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/23 06:20:21 UTC

[2/2] spark git commit: [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming

[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming

- Remove Akka dependency from core. Note: the streaming-akka project still uses Akka.
- Remove HttpFileServer
- Remove Akka configs from SparkConf and SSLOptions
- Rename `spark.akka.frameSize` to `spark.rpc.message.maxSize`. I think it's still worth to keep this config because using `DirectTaskResult` or `IndirectTaskResult`  depends on it.
- Update comments and docs

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10854 from zsxwing/remove-akka.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc1babd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc1babd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc1babd6

Branch: refs/heads/master
Commit: bc1babd63da4ee56e6d371eb24805a5d714e8295
Parents: d8fefab
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Jan 22 21:20:04 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 22 21:20:04 2016 -0800

----------------------------------------------------------------------
 core/pom.xml                                    |  17 +-
 .../scala/org/apache/spark/ContextCleaner.scala |   6 +-
 .../scala/org/apache/spark/HttpFileServer.scala |  91 -------
 .../org/apache/spark/MapOutputTracker.scala     |   7 +-
 .../scala/org/apache/spark/SSLOptions.scala     |  43 +--
 .../org/apache/spark/SecurityManager.scala      |  19 +-
 .../main/scala/org/apache/spark/SparkConf.scala |  32 +--
 .../main/scala/org/apache/spark/SparkEnv.scala  |  49 +---
 .../scala/org/apache/spark/deploy/Client.scala  |   4 -
 .../org/apache/spark/deploy/SparkSubmit.scala   |   2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  12 +-
 .../org/apache/spark/executor/Executor.scala    |   8 +-
 .../cluster/CoarseGrainedClusterMessage.scala   |   1 -
 .../cluster/CoarseGrainedSchedulerBackend.scala |  15 +-
 .../cluster/SimrSchedulerBackend.scala          |   2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 139 ----------
 .../scala/org/apache/spark/util/RpcUtils.scala  |  12 +
 .../org/apache/spark/FileServerSuite.scala      | 264 -------------------
 .../apache/spark/HeartbeatReceiverSuite.scala   |   4 +-
 .../org/apache/spark/LocalSparkContext.scala    |   2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  12 +-
 .../org/apache/spark/SecurityManagerSuite.scala |  12 -
 .../apache/spark/deploy/DeployTestUtils.scala   |   4 +-
 .../StandaloneDynamicAllocationSuite.scala      |   2 +-
 .../spark/deploy/worker/DriverRunnerTest.scala  |   2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |   4 +-
 .../CoarseGrainedSchedulerBackendSuite.scala    |   8 +-
 .../spark/scheduler/SparkListenerSuite.scala    |  17 +-
 .../spark/scheduler/TaskResultGetterSuite.scala |  26 +-
 dev/deps/spark-deps-hadoop-2.2                  |   5 -
 dev/deps/spark-deps-hadoop-2.3                  |   5 -
 dev/deps/spark-deps-hadoop-2.4                  |   5 -
 dev/deps/spark-deps-hadoop-2.6                  |   5 -
 dev/deps/spark-deps-hadoop-2.7                  |   5 -
 docs/cluster-overview.md                        |   2 +-
 docs/configuration.md                           |  65 +----
 docs/security.md                                |  30 +--
 .../spark/mllib/feature/VectorTransformer.scala |   2 +-
 .../mllib/util/LocalClusterSparkContext.scala   |   2 +-
 pom.xml                                         |   5 +
 project/MimaExcludes.scala                      |   4 +-
 .../streaming/ReceivedBlockTrackerSuite.scala   |   1 -
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   2 +-
 43 files changed, 123 insertions(+), 831 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 2071a58..0ab170e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -186,19 +186,6 @@
       <artifactId>commons-net</artifactId>
     </dependency>
     <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-testkit_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
     </dependency>
@@ -225,6 +212,10 @@
       <artifactId>netty-all</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.clearspring.analytics</groupId>
       <artifactId>stream</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4628093..5a42299 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
    * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
    *
    * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
-   * workaround for the issue, which is ultimately caused by the way the BlockManager actors
-   * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+   * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
+   * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
    * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
    * longer in scope.
    */
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
    * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
    * parameter by default disables blocking on shuffle cleanups. Note that this does not affect
    * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
-   * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+   * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
    * resolved.
    */
   private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
deleted file mode 100644
index 46f9f9e..0000000
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import com.google.common.io.Files
-
-import org.apache.spark.util.Utils
-
-private[spark] class HttpFileServer(
-    conf: SparkConf,
-    securityManager: SecurityManager,
-    requestedPort: Int = 0)
-  extends Logging {
-
-  var baseDir : File = null
-  var fileDir : File = null
-  var jarDir : File = null
-  var httpServer : HttpServer = null
-  var serverUri : String = null
-
-  def initialize() {
-    baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
-    fileDir = new File(baseDir, "files")
-    jarDir = new File(baseDir, "jars")
-    fileDir.mkdir()
-    jarDir.mkdir()
-    logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
-    httpServer.start()
-    serverUri = httpServer.uri
-    logDebug("HTTP file server started at: " + serverUri)
-  }
-
-  def stop() {
-    httpServer.stop()
-
-    // If we only stop sc, but the driver process still run as a services then we need to delete
-    // the tmp dir, if not, it will create too many tmp dirs
-    try {
-      Utils.deleteRecursively(baseDir)
-    } catch {
-      case e: Exception =>
-        logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
-    }
-  }
-
-  def addFile(file: File) : String = {
-    addFileToDir(file, fileDir)
-    serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-  def addJar(file: File) : String = {
-    addFileToDir(file, jarDir)
-    serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-  def addDirectory(path: String, resourceBase: String): String = {
-    httpServer.addDirectory(path, resourceBase)
-    serverUri + path
-  }
-
-  def addFileToDir(file: File, dir: File) : String = {
-    // Check whether the file is a directory. If it is, throw a more meaningful exception.
-    // If we don't catch this, Guava throws a very confusing error message:
-    //   java.io.FileNotFoundException: [file] (No such file or directory)
-    // even though the directory ([file]) exists.
-    if (file.isDirectory) {
-      throw new IllegalArgumentException(s"$file cannot be a directory.")
-    }
-    Files.copy(file, new File(dir, file.getName))
-    dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b59beb..eb2fdec 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
 private[spark] class MapOutputTrackerMasterEndpoint(
     override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
   extends RpcEndpoint with Logging {
-  val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case GetMapOutputStatuses(shuffleId: Int) =>
@@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint(
       logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
       val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
       val serializedSize = mapOutputStatuses.length
-      if (serializedSize > maxAkkaFrameSize) {
+      if (serializedSize > maxRpcMessageSize) {
+
         val msg = s"Map output statuses were $serializedSize bytes which " +
-          s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
+          s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
 
         /* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
          * A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/SSLOptions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 261265f..d755f07 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -21,9 +21,6 @@ import java.io.File
 import java.security.NoSuchAlgorithmException
 import javax.net.ssl.SSLContext
 
-import scala.collection.JavaConverters._
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
 import org.eclipse.jetty.util.ssl.SslContextFactory
 
 /**
@@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
  * generate specific objects to configure SSL for different communication protocols.
  *
  * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
- * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
- * authentication with SSL, SSLOptions cannot support it either.
+ * by the protocol, which it can generate the configuration for.
  *
  * @param enabled             enables or disables SSL; if it is set to false, the rest of the
  *                            settings are disregarded
@@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
     }
   }
 
-  /**
-   * Creates an Akka configuration object which contains all the SSL settings represented by this
-   * object. It can be used then to compose the ultimate Akka configuration.
-   */
-  def createAkkaConfig: Option[Config] = {
-    if (enabled) {
-      if (keyStoreType.isDefined) {
-        logWarning("Akka configuration does not support key store type.");
-      }
-      if (trustStoreType.isDefined) {
-        logWarning("Akka configuration does not support trust store type.");
-      }
-
-      Some(ConfigFactory.empty()
-        .withValue("akka.remote.netty.tcp.security.key-store",
-          ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.key-store-password",
-          ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.trust-store",
-          ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.trust-store-password",
-          ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.key-password",
-          ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.random-number-generator",
-          ConfigValueFactory.fromAnyRef(""))
-        .withValue("akka.remote.netty.tcp.security.protocol",
-          ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
-        .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
-          ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
-        .withValue("akka.remote.netty.tcp.enable-ssl",
-          ConfigValueFactory.fromAnyRef(true)))
-    } else {
-      None
-    }
-  }
-
   /*
    * The supportedAlgorithms set is a subset of the enabledAlgorithms that
    * are supported by the current Java security provider for this protocol.

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/SecurityManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c5aec05..0675957 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
  * At this point spark has multiple communication protocols that need to be secured and
  * different underlying mechanisms are used depending on the protocol:
  *
- *  - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- *            Akka remoting allows you to specify a secure cookie that will be exchanged
- *            and ensured to be identical in the connection handshake between the client
- *            and the server. If they are not identical then the client will be refused
- *            to connect to the server. There is no control of the underlying
- *            authentication mechanism so its not clear if the password is passed in
- *            plaintext or uses DIGEST-MD5 or some other mechanism.
- *
- *            Akka also has an option to turn on SSL, this option is currently supported (see
- *            the details below).
- *
  *  - HTTP for broadcast and file server (via HttpServer) ->  Spark currently uses Jetty
  *            for the HttpServer. Jetty supports multiple authentication mechanisms -
  *            Basic, Digest, Form, Spengo, etc. It also supports multiple different login
@@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
  *  denote the global configuration for all the supported protocols. In order to override the global
  *  configuration for the particular protocol, the properties must be overwritten in the
  *  protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
- *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- *  Akka based connections or `fs` for broadcast and file server.
+ *  configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
+ *  broadcast and file server.
  *
  *  Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
  *  options that can be specified.
  *
  *  SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
  *  object parses Spark configuration at a given namespace and builds the common representation
- *  of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- *  TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ *  of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
+ *  Jetty.
  *
  *  SSL must be configured on each node and configured for each component involved in
  *  communication using the particular protocol. In YARN clusters, the key-store can be prepared on

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 340e1f7..36e240e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
           .map{case (k, v) => (k.substring(prefix.length), v)}
   }
 
-  /** Get all akka conf variables set on this SparkConf */
-  def getAkkaConf: Seq[(String, String)] =
-    /* This is currently undocumented. If we want to make this public we should consider
-     * nesting options under the spark namespace to avoid conflicts with user akka options.
-     * Otherwise users configuring their own akka code via system properties could mess up
-     * spark's akka options.
-     *
-     *   E.g. spark.akka.option.x.y.x = "value"
-     */
-    getAll.filter { case (k, _) => isAkkaConf(k) }
-
   /**
    * Returns the Spark application id, valid in the Driver after TaskScheduler registration and
    * from the start in the Executor.
@@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
     "spark.yarn.max.executor.failures" -> Seq(
       AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
     "spark.memory.offHeap.enabled" -> Seq(
-      AlternateConfig("spark.unsafe.offHeap", "1.6"))
+      AlternateConfig("spark.unsafe.offHeap", "1.6")),
+    "spark.rpc.message.maxSize" -> Seq(
+      AlternateConfig("spark.akka.frameSize", "1.6"))
     )
 
   /**
@@ -616,20 +607,12 @@ private[spark] object SparkConf extends Logging {
   }
 
   /**
-   * Return whether the given config is an akka config (e.g. akka.actor.provider).
-   * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
-   */
-  def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
-
-  /**
    * Return whether the given config should be passed to an executor on start-up.
    *
-   * Certain akka and authentication configs are required from the executor when it connects to
+   * Certain authentication configs are required from the executor when it connects to
    * the scheduler, while the rest of the spark configs can be inherited from the driver later.
    */
   def isExecutorStartupConf(name: String): Boolean = {
-    isAkkaConf(name) ||
-    name.startsWith("spark.akka") ||
     (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
     name.startsWith("spark.ssl") ||
     name.startsWith("spark.rpc") ||
@@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
       logWarning(
         s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
         s"may be removed in the future. ${cfg.deprecationMessage}")
+      return
     }
 
     allAlternatives.get(key).foreach { case (newKey, cfg) =>
       logWarning(
         s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
         s"and may be removed in the future. Please use the new key '$newKey' instead.")
+      return
+    }
+    if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
+      logWarning(
+        s"The configuration key $key is not supported any more " +
+          s"because Spark doesn't use Akka since 2.0")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ec43be0..9461afd 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -23,7 +23,6 @@ import java.net.Socket
 import scala.collection.mutable
 import scala.util.Properties
 
-import akka.actor.ActorSystem
 import com.google.common.collect.MapMaker
 
 import org.apache.spark.annotation.DeveloperApi
@@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage._
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
 
 /**
  * :: DeveloperApi ::
  * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
  * Spark code finds the SparkEnv through a global variable, so all the threads can access the same
  * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
  *
@@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
 class SparkEnv (
     val executorId: String,
     private[spark] val rpcEnv: RpcEnv,
-    _actorSystem: ActorSystem, // TODO Remove actorSystem
     val serializer: Serializer,
     val closureSerializer: Serializer,
     val cacheManager: CacheManager,
@@ -71,10 +69,6 @@ class SparkEnv (
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
-  // TODO Remove actorSystem
-  @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
-  val actorSystem: ActorSystem = _actorSystem
-
   private[spark] var isStopped = false
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
@@ -96,13 +90,8 @@ class SparkEnv (
       blockManager.master.stop()
       metricsSystem.stop()
       outputCommitCoordinator.stop()
-      actorSystem.shutdown()
       rpcEnv.shutdown()
-
-      // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
-      // down, but let's call it anyway in case it gets fixed in a later release
-      // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
-      // actorSystem.awaitTermination()
+      rpcEnv.awaitTermination()
 
       // Note that blockTransferService is stopped by BlockManager since it is started by it.
 
@@ -152,8 +141,8 @@ class SparkEnv (
 object SparkEnv extends Logging {
   @volatile private var env: SparkEnv = _
 
-  private[spark] val driverActorSystemName = "sparkDriver"
-  private[spark] val executorActorSystemName = "sparkExecutor"
+  private[spark] val driverSystemName = "sparkDriver"
+  private[spark] val executorSystemName = "sparkExecutor"
 
   def set(e: SparkEnv) {
     env = e
@@ -202,7 +191,7 @@ object SparkEnv extends Logging {
 
   /**
    * Create a SparkEnv for an executor.
-   * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+   * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
    */
   private[spark] def createExecutorEnv(
       conf: SparkConf,
@@ -245,28 +234,11 @@ object SparkEnv extends Logging {
 
     val securityManager = new SecurityManager(conf)
 
-    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
-    // Create the ActorSystem for Akka and get the port it binds to.
-    val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
+    val systemName = if (isDriver) driverSystemName else executorSystemName
+    val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
       clientMode = !isDriver)
-    val actorSystem: ActorSystem = {
-        val actorSystemPort =
-          if (port == 0 || rpcEnv.address == null) {
-            port
-          } else {
-            rpcEnv.address.port + 1
-          }
-        // Create a ActorSystem for legacy codes
-        AkkaUtils.createActorSystem(
-          actorSystemName + "ActorSystem",
-          hostname,
-          actorSystemPort,
-          conf,
-          securityManager
-        )._1
-      }
 
-    // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+    // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
     // In the non-driver case, the RPC env's address may be null since it may not be listening
     // for incoming connections.
     if (isDriver) {
@@ -325,7 +297,7 @@ object SparkEnv extends Logging {
       new MapOutputTrackerWorker(conf)
     }
 
-    // Have to assign trackerActor after initialization as MapOutputTrackerActor
+    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
     // requires the MapOutputTracker itself
     mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
       new MapOutputTrackerMasterEndpoint(
@@ -398,7 +370,6 @@ object SparkEnv extends Logging {
     val envInstance = new SparkEnv(
       executorId,
       rpcEnv,
-      actorSystem,
       serializer,
       closureSerializer,
       cacheManager,

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 63a20ab..dcef03e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -219,11 +219,7 @@ object Client {
     val conf = new SparkConf()
     val driverArgs = new ClientArguments(args)
 
-    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
-      conf.set("spark.akka.logLifecycleEvents", "true")
-    }
     conf.set("spark.rpc.askTimeout", "10")
-    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 
     val rpcEnv =

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a1e8da1..a6749f7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -183,7 +183,7 @@ object SparkSubmit {
     }
 
      // In standalone cluster mode, there are two submission gateways:
-     //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+     //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
      //   (2) The new REST-based gateway introduced in Spark 1.3
      // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
      // to use the legacy gateway if the master endpoint turns out to be not a REST server.

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 58bd9ca..e3a6c4c 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend(
     override val rpcEnv: RpcEnv,
     driverUrl: String,
     executorId: String,
-    hostPort: String,
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv)
@@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
-      ref.ask[RegisterExecutorResponse](
-        RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
+      ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
     }(ThreadUtils.sameThread).onComplete {
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       case Success(msg) => Utils.tryLogNonFatalError {
@@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, port, cores, isLocal = false)
 
-      // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
-      // connections (e.g., if it's using akka). Otherwise, the executor is running in
-      // client mode only, and does not accept incoming connections.
-      val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
-          hostname + ":" + port
-        }.orNull
       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
-        env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
+        env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
       workerUrl.foreach { url =>
         env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 75d7e34..030ae41 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util._
  * Spark executor, backed by a threadpool to run tasks.
  *
  * This can be used with Mesos, YARN, and the standalone scheduler.
- * An internal RPC interface (at the moment Akka) is used for communication with the driver,
+ * An internal RPC interface is used for communication with the driver,
  * except in the case of Mesos fine-grained mode.
  */
 private[spark] class Executor(
@@ -97,9 +97,9 @@ private[spark] class Executor(
   // Set the classloader for serializer
   env.serializer.setDefaultClassLoader(replClassLoader)
 
-  // Akka's message frame size. If task result is bigger than this, we use the block manager
+  // Max RPC message size. If task result is bigger than this, we use the block manager
   // to send the result back.
-  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
 
   // Limit of bytes for total size of results (default is 1GB)
   private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -263,7 +263,7 @@ private[spark] class Executor(
               s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
               s"dropping it.")
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
-          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+          } else if (resultSize >= maxRpcMessageSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index f3d0d85..29e469c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RegisterExecutor(
       executorId: String,
       executorRef: RpcEndpointRef,
-      hostPort: String,
       cores: Int,
       logUrls: Map[String, String])
     extends CoarseGrainedClusterMessage

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b808993..f69a3d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
 
 /**
  * A scheduler backend that waits for coarse-grained executors to connect.
@@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Total number of executors that are currently registered
   var totalRegisteredExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
-  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   var minRegisteredRatio =
@@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
 
-      case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
+      case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
         if (executorDataMap.contains(executorId)) {
           context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
         } else {
@@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
       for (task <- tasks.flatten) {
         val serializedTask = ser.serialize(task)
-        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+        if (serializedTask.limit >= maxRpcMessageSize) {
           scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
             try {
               var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
-                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
-                "spark.akka.frameSize or using broadcast variables for large values."
-              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
-                AkkaUtils.reservedSizeBytes)
+                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+                "spark.rpc.message.maxSize or using broadcast variables for large values."
+              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
               taskSetMgr.abort(msg)
             } catch {
               case e: Exception => logError("Exception in error callback", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0a6f2c0..a298cf5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend(
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
 
     logInfo("Writing to HDFS file: "  + driverFilePath)
-    logInfo("Writing Akka address: "  + driverUrl)
+    logInfo("Writing Driver address: "  + driverUrl)
     logInfo("Writing Spark UI Address: " + appUIAddress)
 
     // Create temporary file to prevent race condition where executors get empty driverUrl file

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b..0000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
-  /**
-   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
-   * ActorSystem itself and its port (which is hard to get from Akka).
-   *
-   * Note: the `name` parameter is important, as even if a client sends a message to right
-   * host + port, if the system name is incorrect, Akka will drop the message.
-   *
-   * If indestructible is set to true, the Actor System will continue running in the event
-   * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
-   */
-  def createActorSystem(
-      name: String,
-      host: String,
-      port: Int,
-      conf: SparkConf,
-      securityManager: SecurityManager): (ActorSystem, Int) = {
-    val startService: Int => (ActorSystem, Int) = { actualPort =>
-      doCreateActorSystem(name, host, actualPort, conf, securityManager)
-    }
-    Utils.startServiceOnPort(port, startService, conf, name)
-  }
-
-  private def doCreateActorSystem(
-      name: String,
-      host: String,
-      port: Int,
-      conf: SparkConf,
-      securityManager: SecurityManager): (ActorSystem, Int) = {
-
-    val akkaThreads = conf.getInt("spark.akka.threads", 4)
-    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
-    val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
-      conf.get("spark.network.timeout", "120s"))
-    val akkaFrameSize = maxFrameSizeBytes(conf)
-    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
-    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
-    if (!akkaLogLifecycleEvents) {
-      // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
-      // See: https://www.assembla.com/spaces/akka/tickets/3787#/
-      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
-    }
-
-    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
-    val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
-    val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
-    val secretKey = securityManager.getSecretKey()
-    val isAuthOn = securityManager.isAuthenticationEnabled()
-    if (isAuthOn && secretKey == null) {
-      throw new Exception("Secret key is null with authentication on")
-    }
-    val requireCookie = if (isAuthOn) "on" else "off"
-    val secureCookie = if (isAuthOn) secretKey else ""
-    logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
-    val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
-        .getOrElse(ConfigFactory.empty())
-
-    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
-      .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
-      s"""
-      |akka.daemonic = on
-      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
-      |akka.stdout-loglevel = "ERROR"
-      |akka.jvm-exit-on-fatal-error = off
-      |akka.remote.require-cookie = "$requireCookie"
-      |akka.remote.secure-cookie = "$secureCookie"
-      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
-      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
-      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
-      |akka.remote.netty.tcp.hostname = "$host"
-      |akka.remote.netty.tcp.port = $port
-      |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
-      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
-      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
-      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
-      |akka.log-config-on-start = $logAkkaConfig
-      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
-      |akka.log-dead-letters = $lifecycleEvents
-      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
-      """.stripMargin))
-
-    val actorSystem = ActorSystem(name, akkaConf)
-    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
-    val boundPort = provider.getDefaultAddress.port.get
-    (actorSystem, boundPort)
-  }
-
-  private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
-  /** Returns the configured max frame size for Akka messages in bytes. */
-  def maxFrameSizeBytes(conf: SparkConf): Int = {
-    val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
-    if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
-      throw new IllegalArgumentException(
-        s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
-    }
-    frameSizeInMB * 1024 * 1024
-  }
-
-  /** Space reserved for extra data in an Akka message besides serialized task or task result. */
-  val reservedSizeBytes = 200 * 1024
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index b68936f..2bb8de5 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -53,4 +53,16 @@ private[spark] object RpcUtils {
   def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
     RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
   }
+
+  private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
+
+  /** Returns the configured max message size for messages in bytes. */
+  def maxMessageSizeBytes(conf: SparkConf): Int = {
+    val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+    if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
+      throw new IllegalArgumentException(
+        s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
+    }
+    maxSizeInMB * 1024 * 1024
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
deleted file mode 100644
index bc7059b..0000000
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.URI
-import java.util.jar.{JarEntry, JarOutputStream}
-import javax.net.ssl.SSLException
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.commons.lang3.RandomUtils
-
-import org.apache.spark.util.Utils
-
-class FileServerSuite extends SparkFunSuite with LocalSparkContext {
-
-  import SSLSampleConfigs._
-
-  @transient var tmpDir: File = _
-  @transient var tmpFile: File = _
-  @transient var tmpJarUrl: String = _
-
-  def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
-
-  override def beforeEach() {
-    super.beforeEach()
-    resetSparkContext()
-  }
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    tmpDir = Utils.createTempDir()
-    val testTempDir = new File(tmpDir, "test")
-    testTempDir.mkdir()
-
-    val textFile = new File(testTempDir, "FileServerSuite.txt")
-    val pw = new PrintWriter(textFile)
-    // scalastyle:off println
-    pw.println("100")
-    // scalastyle:on println
-    pw.close()
-
-    val jarFile = new File(testTempDir, "test.jar")
-    val jarStream = new FileOutputStream(jarFile)
-    val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-
-    val jarEntry = new JarEntry(textFile.getName)
-    jar.putNextEntry(jarEntry)
-
-    val in = new FileInputStream(textFile)
-    ByteStreams.copy(in, jar)
-
-    in.close()
-    jar.close()
-    jarStream.close()
-
-    tmpFile = textFile
-    tmpJarUrl = jarFile.toURI.toURL.toString
-  }
-
-  override def afterAll() {
-    try {
-      Utils.deleteRecursively(tmpDir)
-    } finally {
-      super.afterAll()
-    }
-  }
-
-  test("Distributing files locally") {
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addFile(tmpFile.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test("Distributing files locally security On") {
-    val sparkConf = new SparkConf(false)
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "good")
-    sc = new SparkContext("local[4]", "test", sparkConf)
-
-    sc.addFile(tmpFile.toString)
-    assert(sc.env.securityManager.isAuthenticationEnabled() === true)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test("Distributing files locally using URL as input") {
-    // addFile("file:///....")
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addFile(new File(tmpFile.toString).toURI.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test ("Dynamically adding JARS locally") {
-    sc = new SparkContext("local[4]", "test", newConf)
-    sc.addJar(tmpJarUrl)
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test("Distributing files on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addFile(tmpFile.toString)
-    val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
-    val result = sc.parallelize(testData).reduceByKey {
-      val path = SparkFiles.get("FileServerSuite.txt")
-      val in = new BufferedReader(new FileReader(path))
-      val fileVal = in.readLine().toInt
-      in.close()
-      _ * fileVal + _ * fileVal
-    }.collect()
-    assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
-  }
-
-  test ("Dynamically adding JARS on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addJar(tmpJarUrl)
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test ("Dynamically adding JARS on a standalone cluster using local: URL") {
-    sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
-    sc.addJar(tmpJarUrl.replace("file", "local"))
-    val testData = Array((1, 1))
-    sc.parallelize(testData).foreach { x =>
-      if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
-        throw new SparkException("jar not added")
-      }
-    }
-  }
-
-  test ("HttpFileServer should work with SSL") {
-    val sparkConf = sparkSSLConfig()
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      fileTransferTest(server, sm)
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should work with SSL and good credentials") {
-    val sparkConf = sparkSSLConfig()
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "good")
-
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      fileTransferTest(server, sm)
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should not work with valid SSL and bad credentials") {
-    val sparkConf = sparkSSLConfig()
-    sparkConf.set("spark.authenticate", "true")
-    sparkConf.set("spark.authenticate.secret", "bad")
-
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      intercept[IOException] {
-        fileTransferTest(server)
-      }
-    } finally {
-      server.stop()
-    }
-  }
-
-  test ("HttpFileServer should not work with SSL when the server is untrusted") {
-    val sparkConf = sparkSSLConfigUntrusted()
-    val sm = new SecurityManager(sparkConf)
-    val server = new HttpFileServer(sparkConf, sm, 0)
-    try {
-      server.initialize()
-
-      intercept[SSLException] {
-        fileTransferTest(server)
-      }
-    } finally {
-      server.stop()
-    }
-  }
-
-  def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
-    val randomContent = RandomUtils.nextBytes(100)
-    val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
-    Files.write(randomContent, file)
-    server.addFile(file)
-
-    val uri = new URI(server.serverUri + "/files/" + file.getName)
-
-    val connection = if (sm != null && sm.isAuthenticationEnabled()) {
-      Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
-    } else {
-      uri.toURL.openConnection()
-    }
-
-    if (sm != null) {
-      Utils.setupSecureURLConnection(connection, sm)
-    }
-
-    val buf = ByteStreams.toByteArray(connection.getInputStream)
-    assert(buf === randomContent)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e5350..c7f629a 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -175,9 +175,9 @@ class HeartbeatReceiverSuite
     val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
     val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
     fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
-      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
     fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
-      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+      RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
     heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
     addExecutorAndVerify(executorId1)
     addExecutorAndVerify(executorId2)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index e1a0bf7..24ec99c 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -52,7 +52,7 @@ object LocalSparkContext {
     if (sc != null) {
       sc.stop()
     }
-    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+    // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
     System.clearProperty("spark.driver.port")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 3819c0a..6546def 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     slaveRpcEnv.shutdown()
   }
 
-  test("remote fetch below akka frame size") {
+  test("remote fetch below max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.akka.frameSize", "1")
+    newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
     rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
 
-    // Frame size should be ~123B, and no exception should be thrown
+    // Message size should be ~123B, and no exception should be thrown
     masterTracker.registerShuffle(10, 1)
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
@@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     rpcEnv.shutdown()
   }
 
-  test("remote fetch exceeds akka frame size") {
+  test("remote fetch exceeds max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.akka.frameSize", "1")
+    newConf.set("spark.rpc.message.maxSize", "1")
     newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
@@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
     rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
 
-    // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
+    // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
     // Note that the size is hand-selected here because map output statuses are compressed before
     // being sent.
     masterTracker.registerShuffle(20, 100)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 7603cef..8bdb237 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
     "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
 
     val securityManager = new SecurityManager(conf)
-    val akkaSSLOptions = securityManager.getSSLOptions("akka")
 
     assert(securityManager.fileServerSSLOptions.enabled === true)
-    assert(akkaSSLOptions.enabled === true)
 
     assert(securityManager.sslSocketFactory.isDefined === true)
     assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
     assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
     assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
     assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
-
-    assert(akkaSSLOptions.trustStore.isDefined === true)
-    assert(akkaSSLOptions.trustStore.get.getName === "truststore")
-    assert(akkaSSLOptions.keyStore.isDefined === true)
-    assert(akkaSSLOptions.keyStore.get.getName === "keystore")
-    assert(akkaSSLOptions.trustStorePassword === Some("password"))
-    assert(akkaSSLOptions.keyStorePassword === Some("password"))
-    assert(akkaSSLOptions.keyPassword === Some("password"))
-    assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
-    assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
   }
 
   test("ssl off setup") {

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 190e4dd..9c13c15 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils {
       "publicAddress",
       new File("sparkHome"),
       new File("workDir"),
-      "akka://worker",
+      "spark://worker",
       new SparkConf,
       Seq("localDir"),
       ExecutorState.RUNNING)
@@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils {
       new File("sparkHome"),
       createDriverDesc(),
       null,
-      "akka://worker",
+      "spark://worker",
       new SecurityManager(conf))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index ab3d4ca..fdada07 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite
       val endpointRef = mock(classOf[RpcEndpointRef])
       val mockAddress = mock(classOf[RpcAddress])
       when(endpointRef.address).thenReturn(mockAddress)
-      val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+      val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
       val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index bd8b065..2a1696b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite {
     val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
     val conf = new SparkConf()
     new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
-      driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
+      driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
   }
 
   private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 64e486d..6f4eda8 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       val e = intercept[Exception] {
         Await.result(f, 1 seconds)
       }
-      assert(e.isInstanceOf[TimeoutException] || // For Akka
-        e.isInstanceOf[NotSerializableException] // For Netty
-      )
+      assert(e.isInstanceOf[NotSerializableException])
     } finally {
       anotherEnv.shutdown()
       anotherEnv.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70f40fb..04cccc6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -18,16 +18,16 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
 
-  test("serialized task larger than akka frame size") {
+  test("serialized task larger than max RPC message size") {
     val conf = new SparkConf
-    conf.set("spark.akka.frameSize", "1")
+    conf.set("spark.rpc.message.maxSize", "1")
     conf.set("spark.default.parallelism", "1")
     sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
-    val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+    val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
     val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
     val larger = sc.parallelize(Seq(buffer))
     val thrown = intercept[SparkException] {

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d..58d217f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
 class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
   with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
   }
 
   test("onTaskGettingResult() called when result fetched remotely") {
-    val conf = new SparkConf().set("spark.akka.frameSize", "1")
+    val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
     sc = new SparkContext("local", "SparkListenerSuite", conf)
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
-    // Make a task whose result is larger than the akka frame size
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    assert(akkaFrameSize === 1024 * 1024)
+    // Make a task whose result is larger than the RPC message size
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    assert(maxRpcMessageSize === 1024 * 1024)
     val result = sc.parallelize(Seq(1), 1)
-      .map { x => 1.to(akkaFrameSize).toArray }
+      .map { x => 1.to(maxRpcMessageSize).toArray }
       .reduce { case (x, y) => x }
-    assert(result === 1.to(akkaFrameSize).toArray)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
 
-    // Make a task whose result is larger than the akka frame size
+    // Make a task whose result is larger than the RPC message size
     val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
     assert(result === 2)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index bc72c36..cc2557c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.storage.TaskResultBlockId
 import org.apache.spark.TestUtils.JavaSourceFromString
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
 
 /**
  * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
  */
 class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
 
-  // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+  // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
   // as we can make it) so the tests don't take too long.
-  def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
+  def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
 
-  test("handling results smaller than Akka frame size") {
+  test("handling results smaller than max RPC message size") {
     sc = new SparkContext("local", "test", conf)
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
-  test("handling results larger than Akka frame size") {
+  test("handling results larger than max RPC message size") {
     sc = new SparkContext("local", "test", conf)
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
-    assert(result === 1.to(akkaFrameSize).toArray)
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    val result =
+      sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     val RESULT_BLOCK_ID = TaskResultBlockId(0)
     assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
@@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
     }
     val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
     scheduler.taskResultGetter = resultGetter
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
-    val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+    val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+    val result =
+      sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
     assert(resultGetter.removeBlockSuccessfully)
-    assert(result === 1.to(akkaFrameSize).toArray)
+    assert(result === 1.to(maxRpcMessageSize).toArray)
 
     // Make sure two tasks were run (one failed one, and a second retried one).
     assert(scheduler.nextTaskId.get() === 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/dev/deps/spark-deps-hadoop-2.2
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 0760529..4d9937c 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -44,7 +41,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -179,7 +175,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/dev/deps/spark-deps-hadoop-2.3
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 191f2a0..fd659ee 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -170,7 +166,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/dev/deps/spark-deps-hadoop-2.4
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9134e99..afae3de 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.4.0.jar
 curator-framework-2.4.0.jar
@@ -171,7 +167,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 8c45832..5a64601 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.6.0.jar
 curator-framework-2.6.0.jar
@@ -177,7 +173,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 1d34854..70083e7 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
 RoaringBitmap-0.5.11.jar
 ST4-4.0.4.jar
 activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
 antlr-runtime-3.5.2.jar
 aopalliance-1.0.jar
 apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
 commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
-config-1.2.1.jar
 core-1.1.2.jar
 curator-client-2.6.0.jar
 curator-framework-2.6.0.jar
@@ -178,7 +174,6 @@ stax-api-1.0-2.jar
 stax-api-1.0.1.jar
 stream-2.7.0.jar
 super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
 univocity-parsers-1.5.6.jar
 unused-1.0.0.jar
 xbean-asm5-shaded-4.4.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/docs/cluster-overview.md
----------------------------------------------------------------------
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 2810112..814e440 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -35,7 +35,7 @@ There are several useful things to note about this architecture:
    processes, and these communicate with each other, it is relatively easy to run it even on a
    cluster manager that also supports other applications (e.g. Mesos/YARN).
 3. The driver program must listen for and accept incoming connections from its executors throughout
-   its lifetime (e.g., see [spark.driver.port and spark.fileserver.port in the network config
+   its lifetime (e.g., see [spark.driver.port in the network config
    section](configuration.html#networking)). As such, the driver program must be network
    addressable from the worker nodes.
 4. Because the driver schedules tasks on the cluster, it should be run close to the worker

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index acaeb83..d2a2f10 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -944,52 +944,12 @@ Apart from these, the following properties are also available, and may be useful
 <table class="table">
 <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
 <tr>
-  <td><code>spark.akka.frameSize</code></td>
+  <td><code>spark.rpc.message.maxSize</code></td>
   <td>128</td>
   <td>
     Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map
     output size information sent between executors and the driver. Increase this if you are running
-    jobs with many thousands of map and reduce tasks and see messages about the frame size.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.heartbeat.interval</code></td>
-  <td>1000s</td>
-  <td>
-    This is set to a larger value to disable the transport failure detector that comes built in to
-    Akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger
-    interval value reduces network overhead and a smaller value ( ~ 1 s) might be more
-    informative for Akka's failure detector. Tune this in combination of <code>spark.akka.heartbeat.pauses</code>
-    if you need to. A likely positive use case for using failure detector would be: a sensistive
-    failure detector can help evict rogue executors quickly. However this is usually not the case
-    as GC pauses and network lags are expected in a real Spark cluster. Apart from that enabling
-    this leads to a lot of exchanges of heart beats between nodes leading to flooding the network
-    with those.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.heartbeat.pauses</code></td>
-  <td>6000s</td>
-  <td>
-     This is set to a larger value to disable the transport failure detector that comes built in to Akka.
-     It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart
-     beat pause for Akka. This can be used to control sensitivity to GC pauses. Tune
-     this along with <code>spark.akka.heartbeat.interval</code> if you need to.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.threads</code></td>
-  <td>4</td>
-  <td>
-    Number of actor threads to use for communication. Can be useful to increase on large clusters
-    when the driver has a lot of CPU cores.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.akka.timeout</code></td>
-  <td>100s</td>
-  <td>
-    Communication timeout between Spark nodes.
+    jobs with many thousands of map and reduce tasks and see messages about the RPC message size.
   </td>
 </tr>
 <tr>
@@ -1016,27 +976,11 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.executor.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the executor to listen on. This is used for communicating with the driver.
-    This is only relevant when using the Akka RPC backend.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.fileserver.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the driver's HTTP file server to listen on.
-    This is only relevant when using the Akka RPC backend.
-  </td>
-</tr>
-<tr>
   <td><code>spark.network.timeout</code></td>
   <td>120s</td>
   <td>
     Default timeout for all network interactions. This config will be used in place of
-    <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+    <code>spark.core.connection.ack.wait.timeout</code>,
     <code>spark.storage.blockManagerSlaveTimeoutMs</code>,
     <code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
     <code>spark.rpc.lookupTimeout</code> if they are not configured.
@@ -1418,8 +1362,7 @@ Apart from these, the following properties are also available, and may be useful
 
             <p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
             particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
-            either <code>akka</code> for Akka based connections or <code>fs</code> for file
-            server.</p>
+            only <code>fs</code> for file server.</p>
         </td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/bc1babd6/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index a4cc0f4..32c33d2 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -27,8 +27,7 @@ If your applications are using event logging, the directory where the event logs
 
 ## Encryption
 
-Spark supports SSL for Akka and HTTP protocols. SASL encryption is supported for the block transfer
-service.
+Spark supports SSL for HTTP protocols. SASL encryption is supported for the block transfer service.
 
 Encryption is not yet supported for data stored by Spark in temporary local storage, such as shuffle
 files, cached data, and other application files. If encrypting this data is desired, a workaround is
@@ -49,10 +48,6 @@ component-specific configuration namespaces used to override the default setting
     <th>Component</th>
   </tr>
   <tr>
-    <td><code>spark.ssl.akka</code></td>
-    <td>Akka communication channels</td>
-  </tr>
-  <tr>
     <td><code>spark.ssl.fs</code></td>
     <td>HTTP file server and broadcast server</td>
   </tr>
@@ -137,7 +132,7 @@ configure those ports.
     <td>7077</td>
     <td>Submit job to cluster /<br> Join cluster</td>
     <td><code>SPARK_MASTER_PORT</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+    <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
   </tr>
   <tr>
     <td>Standalone Master</td>
@@ -145,7 +140,7 @@ configure those ports.
     <td>(random)</td>
     <td>Schedule executors</td>
     <td><code>SPARK_WORKER_PORT</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+    <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
   </tr>
 </table>
 
@@ -178,24 +173,7 @@ configure those ports.
     <td>(random)</td>
     <td>Connect to application /<br> Notify executor state changes</td>
     <td><code>spark.driver.port</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly.</td>
-  </tr>
-  <tr>
-    <td>Driver</td>
-    <td>Executor</td>
-    <td>(random)</td>
-    <td>Schedule tasks</td>
-    <td><code>spark.executor.port</code></td>
-    <td>Akka-based. Set to "0" to choose a port randomly. Only used if Akka RPC backend is
-    configured.</td>
-  </tr>
-  <tr>
-    <td>Executor</td>
-    <td>Driver</td>
-    <td>(random)</td>
-    <td>File server for files and jars</td>
-    <td><code>spark.fileserver.port</code></td>
-    <td>Jetty-based. Only used if Akka RPC backend is configured.</td>
+    <td>Set to "0" to choose a port randomly.</td>
   </tr>
   <tr>
     <td>Executor / Driver</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org