You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/21 08:18:46 UTC

spark git commit: [SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*

Repository: spark
Updated Branches:
  refs/heads/master c736220da -> 8136810df


[SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*

Deprecated `spark.akka.num.retries`, `spark.akka.retry.wait`, `spark.akka.askTimeout`,  `spark.akka.lookupTimeout`, and added `spark.rpc.num.retries`, `spark.rpc.retry.wait`, `spark.rpc.askTimeout`, `spark.rpc.lookupTimeout`.

Author: zsxwing <zs...@gmail.com>

Closes #5595 from zsxwing/SPARK-6490 and squashes the following commits:

e0d80a9 [zsxwing] Use getTimeAsMs and getTimeAsSeconds and other minor fixes
31dbe69 [zsxwing] Add spark.rpc.* and deprecate spark.akka.*


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8136810d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8136810d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8136810d

Branch: refs/heads/master
Commit: 8136810dfad12008ac300116df7bc8448740f1ae
Parents: c736220
Author: zsxwing <zs...@gmail.com>
Authored: Mon Apr 20 23:18:42 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Apr 20 23:18:42 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 10 +++++++-
 .../scala/org/apache/spark/deploy/Client.scala  |  6 ++---
 .../apache/spark/deploy/client/AppClient.scala  |  4 +--
 .../org/apache/spark/deploy/master/Master.scala |  4 +--
 .../spark/deploy/master/ui/MasterWebUI.scala    |  4 +--
 .../deploy/rest/StandaloneRestServer.scala      |  8 +++---
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |  4 +--
 .../scala/org/apache/spark/rpc/RpcEnv.scala     | 10 ++++----
 .../cluster/YarnSchedulerBackend.scala          |  4 +--
 .../spark/storage/BlockManagerMaster.scala      |  4 +--
 .../scala/org/apache/spark/util/AkkaUtils.scala | 26 +++-----------------
 .../scala/org/apache/spark/util/RpcUtils.scala  | 23 +++++++++++++++++
 .../apache/spark/MapOutputTrackerSuite.scala    |  4 +--
 .../scala/org/apache/spark/SparkConfSuite.scala | 24 +++++++++++++++++-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |  4 +--
 15 files changed, 86 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e3a649d..c1996e0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
     "spark.yarn.am.waitTime" -> Seq(
       AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
         // Translate old value to a duration, with 10s wait time per try.
-        translation = s => s"${s.toLong * 10}s"))
+        translation = s => s"${s.toLong * 10}s")),
+    "spark.rpc.numRetries" -> Seq(
+      AlternateConfig("spark.akka.num.retries", "1.4")),
+    "spark.rpc.retry.wait" -> Seq(
+      AlternateConfig("spark.akka.retry.wait", "1.4")),
+    "spark.rpc.askTimeout" -> Seq(
+      AlternateConfig("spark.akka.askTimeout", "1.4")),
+    "spark.rpc.lookupTimeout" -> Seq(
+      AlternateConfig("spark.akka.lookupTimeout", "1.4"))
     )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 8d13b2a..c2c3e9a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
 
 /**
  * Proxy that relays messages to the driver.
@@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
   extends Actor with ActorLogReceive with Logging {
 
   var masterActor: ActorSelection = _
-  val timeout = AkkaUtils.askTimeout(conf)
+  val timeout = RpcUtils.askTimeout(conf)
 
   override def preStart(): Unit = {
     masterActor = context.actorSelection(
@@ -155,7 +155,7 @@ object Client {
     if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
       conf.set("spark.akka.logLifecycleEvents", "true")
     }
-    conf.set("spark.akka.askTimeout", "10")
+    conf.set("spark.rpc.askTimeout", "10")
     conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 4f06d7f..43c8a93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}
 
 /**
  * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -193,7 +193,7 @@ private[spark] class AppClient(
   def stop() {
     if (actor != null) {
       try {
-        val timeout = AkkaUtils.askTimeout(conf)
+        val timeout = RpcUtils.askTimeout(conf)
         val future = actor.ask(StopAppClient)(timeout)
         Await.result(future, timeout)
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c5a6b1b..ff2eed6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}
 
 private[master] class Master(
     host: String,
@@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
       securityManager = securityMgr)
     val actor = actorSystem.actorOf(
       Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
-    val timeout = AkkaUtils.askTimeout(conf)
+    val timeout = RpcUtils.askTimeout(conf)
     val portsRequest = actor.ask(BoundPortsRequest)(timeout)
     val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
     (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index bb11e06..aad9c87 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -21,7 +21,7 @@ import org.apache.spark.Logging
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
 
 /**
  * Web UI server for the standalone master.
@@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
   extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
 
   val masterActorRef = master.self
-  val timeout = AkkaUtils.askTimeout(master.conf)
+  val timeout = RpcUtils.askTimeout(master.conf)
   val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
 
   initialize()

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 4f19af5..2d6b8d4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -32,7 +32,7 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
 import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
 import org.apache.spark.deploy.ClientArguments._
 
@@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
   }
 
   protected def handleKill(submissionId: String): KillSubmissionResponse = {
-    val askTimeout = AkkaUtils.askTimeout(conf)
+    val askTimeout = RpcUtils.askTimeout(conf)
     val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
       DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
     val k = new KillSubmissionResponse
@@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
   }
 
   protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
-    val askTimeout = AkkaUtils.askTimeout(conf)
+    val askTimeout = RpcUtils.askTimeout(conf)
     val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
       DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
     val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
@@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
       responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
     requestMessage match {
       case submitRequest: CreateSubmissionRequest =>
-        val askTimeout = AkkaUtils.askTimeout(conf)
+        val askTimeout = RpcUtils.askTimeout(conf)
         val driverDescription = buildDriverDescription(submitRequest)
         val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
           DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index de6423b..b3bb5f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.deploy.worker.ui.WorkerWebUI._
 import org.apache.spark.ui.{SparkUI, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
 
 /**
  * Web UI server for the standalone worker.
@@ -38,7 +38,7 @@ class WorkerWebUI(
   extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
   with Logging {
 
-  private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
+  private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
 
   initialize()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index cba038c..a5336b7 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -25,7 +25,7 @@ import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
 
 /**
  * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
@@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
  */
 private[spark] abstract class RpcEnv(conf: SparkConf) {
 
-  private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+  private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
 
   /**
    * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint
 private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
   extends Serializable with Logging {
 
-  private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
-  private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
-  private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
+  private[this] val maxRetries = RpcUtils.numRetries(conf)
+  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
+  private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
 
   /**
    * return the address for the [[RpcEndpointRef]]

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index f72566c..1406a36 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
 
 import scala.util.control.NonFatal
 
@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
   private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
     YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
 
-  private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+  private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
 
   /**
    * Request executors from the ApplicationMaster by specifying the total number desired.

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index ceacf04..c798843 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.{Logging, SparkConf, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
 
 private[spark]
 class BlockManagerMaster(
@@ -32,7 +32,7 @@ class BlockManagerMaster(
     isDriver: Boolean)
   extends Logging {
 
-  val timeout = AkkaUtils.askTimeout(conf)
+  val timeout = RpcUtils.askTimeout(conf)
 
   /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
   def removeExecutor(execId: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8e8cc7c..b725df3 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
 
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.concurrent.Await
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -125,16 +125,6 @@ private[spark] object AkkaUtils extends Logging {
     (actorSystem, boundPort)
   }
 
-  /** Returns the default Spark timeout to use for Akka ask operations. */
-  def askTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
-  }
-
-  /** Returns the default Spark timeout to use for Akka remote actor lookup. */
-  def lookupTimeout(conf: SparkConf): FiniteDuration = {
-    Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
-  }
-
   private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
 
   /** Returns the configured max frame size for Akka messages in bytes. */
@@ -150,16 +140,6 @@ private[spark] object AkkaUtils extends Logging {
   /** Space reserved for extra data in an Akka message besides serialized task or task result. */
   val reservedSizeBytes = 200 * 1024
 
-  /** Returns the configured number of times to retry connecting */
-  def numRetries(conf: SparkConf): Int = {
-    conf.getInt("spark.akka.num.retries", 3)
-  }
-
-  /** Returns the configured number of milliseconds to wait on each retry */
-  def retryWaitMs(conf: SparkConf): Int = {
-    conf.getInt("spark.akka.retry.wait", 3000)
-  }
-
   /**
    * Send a message to the given actor and get its result within a default timeout, or
    * throw a SparkException if this fails.
@@ -216,7 +196,7 @@ private[spark] object AkkaUtils extends Logging {
     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
     Utils.checkHost(driverHost, "Expected hostname")
     val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
-    val timeout = AkkaUtils.lookupTimeout(conf)
+    val timeout = RpcUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
   }
@@ -230,7 +210,7 @@ private[spark] object AkkaUtils extends Logging {
     val executorActorSystemName = SparkEnv.executorActorSystemName
     Utils.checkHost(host, "Expected hostname")
     val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
-    val timeout = AkkaUtils.lookupTimeout(conf)
+    val timeout = RpcUtils.lookupTimeout(conf)
     logInfo(s"Connecting to $name: $url")
     Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index 6665b17..5ae793e 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.util
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
 import org.apache.spark.{SparkEnv, SparkConf}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 
@@ -32,4 +35,24 @@ object RpcUtils {
     Utils.checkHost(driverHost, "Expected hostname")
     rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
   }
+
+  /** Returns the configured number of times to retry connecting */
+  def numRetries(conf: SparkConf): Int = {
+    conf.getInt("spark.rpc.numRetries", 3)
+  }
+
+  /** Returns the configured number of milliseconds to wait on each retry */
+  def retryWaitMs(conf: SparkConf): Long = {
+    conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
+  }
+
+  /** Returns the default Spark timeout to use for RPC ask operations. */
+  def askTimeout(conf: SparkConf): FiniteDuration = {
+    conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
+  }
+
+  /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
+  def lookupTimeout(conf: SparkConf): FiniteDuration = {
+    conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6295d34..6ed057a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
   test("remote fetch below akka frame size") {
     val newConf = new SparkConf
     newConf.set("spark.akka.frameSize", "1")
-    newConf.set("spark.akka.askTimeout", "1") // Fail fast
+    newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     val rpcEnv = createRpcEnv("spark")
@@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
   test("remote fetch exceeds akka frame size") {
     val newConf = new SparkConf
     newConf.set("spark.akka.frameSize", "1")
-    newConf.set("spark.akka.askTimeout", "1") // Fail fast
+    newConf.set("spark.rpc.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     val rpcEnv = createRpcEnv("test")

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 8e6c200..d7d8014 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark
 
 import java.util.concurrent.{TimeUnit, Executors}
 
+import scala.concurrent.duration._
+import scala.language.postfixOps
 import scala.util.{Try, Random}
 
 import org.scalatest.FunSuite
 import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
 import com.esotericsoftware.kryo.Kryo
 
 class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
@@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
     assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
   }
 
+  test("akka deprecated configs") {
+    val conf = new SparkConf()
+
+    assert(!conf.contains("spark.rpc.num.retries"))
+    assert(!conf.contains("spark.rpc.retry.wait"))
+    assert(!conf.contains("spark.rpc.askTimeout"))
+    assert(!conf.contains("spark.rpc.lookupTimeout"))
+
+    conf.set("spark.akka.num.retries", "1")
+    assert(RpcUtils.numRetries(conf) === 1)
+
+    conf.set("spark.akka.retry.wait", "2")
+    assert(RpcUtils.retryWaitMs(conf) === 2L)
+
+    conf.set("spark.akka.askTimeout", "3")
+    assert(RpcUtils.askTimeout(conf) === (3 seconds))
+
+    conf.set("spark.akka.lookupTimeout", "4")
+    assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+  }
 }
 
 class Class1 {}

http://git-wip-us.apache.org/repos/asf/spark/blob/8136810d/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index ada07ef..5fbda37 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     })
 
     val conf = new SparkConf()
-    conf.set("spark.akka.retry.wait", "0")
-    conf.set("spark.akka.num.retries", "1")
+    conf.set("spark.rpc.retry.wait", "0")
+    conf.set("spark.rpc.num.retries", "1")
     val anotherEnv = createRpcEnv(conf, "remote", 13345)
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org