You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/07/03 04:35:41 UTC

spark git commit: [SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer)

Repository: spark
Updated Branches:
  refs/heads/master d9838196f -> aa7bbc143


[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer)

Latest changes after refactoring to the RPC layer.  I rebased against trunk to make sure to get any recent changes since it had been a while.  I wasn't crazy about the name `ConfigureTimeout` and `RpcTimeout` seemed to fit better, but I'm open to suggestions!

I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources".  I think its just my machine, so I'd though I would push what I have anyway.

Still left to do:
* I only added a couple unit tests so far, there are probably some more cases to test
* Make sure all uses require a `RpcTimeout`
* Right now, both the `ask` and `Await.result` use the same timeout, should we differentiate between these in the TimeoutException message?
* I wrapped `Await.result` in `RpcTimeout`, should we also wrap `Await.ready`?
* Proper scoping of classes and methods

hardmettle, feel free to help out with any of these!

Author: Bryan Cutler <bj...@us.ibm.com>
Author: Harsh Gupta <ha...@Harshs-MacBook-Pro.local>
Author: BryanCutler <cu...@gmail.com>

Closes #6205 from BryanCutler/configTimeout-6980 and squashes the following commits:

46c8d48 [Bryan Cutler] [SPARK-6980] Changed RpcEnvSuite test to never reply instead of just sleeping, to avoid possible sync issues
06afa53 [Bryan Cutler] [SPARK-6980] RpcTimeout class extends Serializable, was causing error in MasterSuite
7bb70f1 [Bryan Cutler] Merge branch 'master' into configTimeout-6980
dbd5f73 [Bryan Cutler] [SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scope to private[spark] and improved deprecation warning msg
4e89c75 [Bryan Cutler] [SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in YarnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions
6a1c50d [Bryan Cutler] [SPARK-6980] Minor cleanup of test case
7f4d78e [Bryan Cutler] [SPARK-6980] Fixed scala style checks
287059a [Bryan Cutler] [SPARK-6980] Removed extra import in AkkaRpcEnvSuite
3d8b1ff [Bryan Cutler] [SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite
3a168c7 [Bryan Cutler] [SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
7636189 [Bryan Cutler] [SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTimeout - this was being compiled by auto-tupling and changing the message type of BlockManagerHeartbeat
be11c4e [Bryan Cutler] Merge branch 'master' into configTimeout-6980
039afed [Bryan Cutler] [SPARK-6980] Corrected import organization
218aa50 [Bryan Cutler] [SPARK-6980] Corrected issues from feedback
fadaf6f [Bryan Cutler] [SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTimout to fix MiMa errors
fa6ed82 [Bryan Cutler] [SPARK-6980] Had to increase timeout on positive test case because a processor slowdown could trigger an Future TimeoutException
b05d449 [Bryan Cutler] [SPARK-6980] Changed constructor to use val duration instead of getter function, changed name of string property from conf to timeoutProp for consistency
c6cfd33 [Bryan Cutler] [SPARK-6980] Changed UT ask message timeout to explicitly intercept a SparkException
1394de6 [Bryan Cutler] [SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly
1517721 [Bryan Cutler] [SPARK-6980] RpcTimeout object scope should be private[spark]
2206b4d [Bryan Cutler] [SPARK-6980] Added unit test for ask then immediat awaitReply
1b9beab [Bryan Cutler] [SPARK-6980] Cleaned up import ordering
08f5afc [Bryan Cutler] [SPARK-6980] Added UT for constructing RpcTimeout with default value
d3754d1 [Bryan Cutler] [SPARK-6980] Added akkaConf to prevent dead letter logging
995d196 [Bryan Cutler] [SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback
7774d56 [Bryan Cutler] [SPARK-6980] Cleaned up UT imports
4351c48 [Bryan Cutler] [SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs
1607a5f [Bryan Cutler] [SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup from PR comments
2f94095 [Bryan Cutler] [SPARK-6980] Added addMessageIfTimeout for when a Future is completed with TimeoutException
235919b [Bryan Cutler] [SPARK-6980] Resolved conflicts after master merge
c07d05c [Bryan Cutler] Merge branch 'master' into configTimeout-6980-tmp
b7fb99f [BryanCutler] Merge pull request #2 from hardmettle/configTimeoutUpdates_6980
4be3a8d [Harsh Gupta] Modifying loop condition to find property match
0ee5642 [Harsh Gupta] Changing the loop condition to halt at the first match in the property list for RpcEnv exception catch
f74064d [Harsh Gupta] Retrieving properties from property list using iterator and while loop instead of chained functions
a294569 [Bryan Cutler] [SPARK-6980] Added creation of RpcTimeout with Seq of property keys
23d2f26 [Bryan Cutler] [SPARK-6980] Fixed await result not being handled by RpcTimeout
49f9f04 [Bryan Cutler] [SPARK-6980] Minor cleanup and scala style fix
5b59a44 [Bryan Cutler] [SPARK-6980] Added some RpcTimeout unit tests
78a2c0a [Bryan Cutler] [SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now
97523e0 [Bryan Cutler] [SPARK-6980] Akka ask timeout description refactored to RPC layer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa7bbc14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa7bbc14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa7bbc14

Branch: refs/heads/master
Commit: aa7bbc143844020e4711b3aa4ce75c1b7733a80d
Parents: d983819
Author: Bryan Cutler <bj...@us.ibm.com>
Authored: Thu Jul 2 21:38:21 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Jul 2 21:38:21 2015 -0500

----------------------------------------------------------------------
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   2 +-
 .../org/apache/spark/rpc/RpcEndpointRef.scala   |  17 +--
 .../scala/org/apache/spark/rpc/RpcEnv.scala     | 112 ++++++++++++++++++-
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |  15 ++-
 .../apache/spark/scheduler/DAGScheduler.scala   |   3 +-
 .../cluster/YarnSchedulerBackend.scala          |   2 +-
 .../spark/storage/BlockManagerMaster.scala      |  14 +--
 .../scala/org/apache/spark/util/AkkaUtils.scala |  19 ++--
 .../scala/org/apache/spark/util/RpcUtils.scala  |  20 +++-
 .../scala/org/apache/spark/SparkConfSuite.scala |   4 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |  97 +++++++++++++++-
 11 files changed, 258 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index b3bb5f9..334a5b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -38,7 +38,7 @@ class WorkerWebUI(
   extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
   with Logging {
 
-  private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
+  private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
 
   initialize()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 69181ed..6ae4789 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.rpc
 
-import scala.concurrent.{Await, Future}
-import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.Future
 import scala.reflect.ClassTag
 
 import org.apache.spark.util.RpcUtils
@@ -32,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
 
   private[this] val maxRetries = RpcUtils.numRetries(conf)
   private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
-  private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
+  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
 
   /**
    * return the address for the [[RpcEndpointRef]]
@@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
    *
    * This method only sends the message once and never retries.
    */
-  def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
+  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
 
   /**
    * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
@@ -91,7 +90,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
    * @tparam T type of the reply message
    * @return the reply message from the corresponding [[RpcEndpoint]]
    */
-  def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
+  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
     // TODO: Consider removing multiple attempts
     var attempts = 0
     var lastException: Exception = null
@@ -99,7 +98,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
       attempts += 1
       try {
         val future = ask[T](message, timeout)
-        val result = Await.result(future, timeout)
+        val result = timeout.awaitResult(future)
         if (result == null) {
           throw new SparkException("Actor returned null")
         }
@@ -110,10 +109,14 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
           lastException = e
           logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
       }
-      Thread.sleep(retryWaitMs)
+
+      if (attempts < maxRetries) {
+        Thread.sleep(retryWaitMs)
+      }
     }
 
     throw new SparkException(
       s"Error sending message [message = $message]", lastException)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 3b6938e..1709bdf 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.rpc
 
 import java.net.URI
+import java.util.concurrent.TimeoutException
 
-import scala.concurrent.{Await, Future}
+import scala.concurrent.{Awaitable, Await, Future}
+import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.spark.{SecurityManager, SparkConf}
@@ -66,7 +68,7 @@ private[spark] object RpcEnv {
  */
 private[spark] abstract class RpcEnv(conf: SparkConf) {
 
-  private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
+  private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
 
   /**
    * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
    * Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
    */
   def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
-    Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
+    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
   }
 
   /**
@@ -184,3 +186,107 @@ private[spark] object RpcAddress {
     RpcAddress(host, port)
   }
 }
+
+
+/**
+ * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
+ */
+private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
+  extends TimeoutException(message) { initCause(cause) }
+
+
+/**
+ * Associates a timeout with a description so that a when a TimeoutException occurs, additional
+ * context about the timeout can be amended to the exception message.
+ * @param duration timeout duration in seconds
+ * @param timeoutProp the configuration property that controls this timeout
+ */
+private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
+  extends Serializable {
+
+  /** Amends the standard message of TimeoutException to include the description */
+  private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
+    new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te)
+  }
+
+  /**
+   * PartialFunction to match a TimeoutException and add the timeout description to the message
+   *
+   * @note This can be used in the recover callback of a Future to add to a TimeoutException
+   * Example:
+   *    val timeout = new RpcTimeout(5 millis, "short timeout")
+   *    Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
+   */
+  def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
+    // The exception has already been converted to a RpcTimeoutException so just raise it
+    case rte: RpcTimeoutException => throw rte
+    // Any other TimeoutException get converted to a RpcTimeoutException with modified message
+    case te: TimeoutException => throw createRpcTimeoutException(te)
+  }
+
+  /**
+   * Wait for the completed result and return it. If the result is not available within this
+   * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
+   * @param  awaitable  the `Awaitable` to be awaited
+   * @throws RpcTimeoutException if after waiting for the specified time `awaitable`
+   *         is still not ready
+   */
+  def awaitResult[T](awaitable: Awaitable[T]): T = {
+    try {
+      Await.result(awaitable, duration)
+    } catch addMessageIfTimeout
+  }
+}
+
+
+private[spark] object RpcTimeout {
+
+  /**
+   * Lookup the timeout property in the configuration and create
+   * a RpcTimeout with the property key in the description.
+   * @param conf configuration properties containing the timeout
+   * @param timeoutProp property key for the timeout in seconds
+   * @throws NoSuchElementException if property is not set
+   */
+  def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
+    val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
+    new RpcTimeout(timeout, timeoutProp)
+  }
+
+  /**
+   * Lookup the timeout property in the configuration and create
+   * a RpcTimeout with the property key in the description.
+   * Uses the given default value if property is not set
+   * @param conf configuration properties containing the timeout
+   * @param timeoutProp property key for the timeout in seconds
+   * @param defaultValue default timeout value in seconds if property not found
+   */
+  def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
+    val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
+    new RpcTimeout(timeout, timeoutProp)
+  }
+
+  /**
+   * Lookup prioritized list of timeout properties in the configuration
+   * and create a RpcTimeout with the first set property key in the
+   * description.
+   * Uses the given default value if property is not set
+   * @param conf configuration properties containing the timeout
+   * @param timeoutPropList prioritized list of property keys for the timeout in seconds
+   * @param defaultValue default timeout value in seconds if no properties found
+   */
+  def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
+    require(timeoutPropList.nonEmpty)
+
+    // Find the first set property or use the default value with the first property
+    val itr = timeoutPropList.iterator
+    var foundProp: Option[(String, String)] = None
+    while (itr.hasNext && foundProp.isEmpty){
+      val propKey = itr.next()
+      conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
+    }
+    val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
+    val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
+    new RpcTimeout(timeout, finalProp._1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index 31ebe5a..f2d87f6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -20,7 +20,6 @@ package org.apache.spark.rpc.akka
 import java.util.concurrent.ConcurrentHashMap
 
 import scala.concurrent.Future
-import scala.concurrent.duration._
 import scala.language.postfixOps
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -214,8 +213,11 @@ private[spark] class AkkaRpcEnv private[akka] (
 
   override def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
     import actorSystem.dispatcher
-    actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout).
-      map(new AkkaRpcEndpointRef(defaultAddress, _, conf))
+    actorSystem.actorSelection(uri).resolveOne(defaultLookupTimeout.duration).
+      map(new AkkaRpcEndpointRef(defaultAddress, _, conf)).
+      // this is just in case there is a timeout from creating the future in resolveOne, we want the
+      // exception to indicate the conf that determines the timeout
+      recover(defaultLookupTimeout.addMessageIfTimeout)
   }
 
   override def uriOf(systemName: String, address: RpcAddress, endpointName: String): String = {
@@ -295,8 +297,8 @@ private[akka] class AkkaRpcEndpointRef(
     actorRef ! AkkaMessage(message, false)
   }
 
-  override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
-    actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
+  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
+    actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
       // The function will run in the calling thread, so it should be short and never block.
       case msg @ AkkaMessage(message, reply) =>
         if (reply) {
@@ -307,7 +309,8 @@ private[akka] class AkkaRpcEndpointRef(
         }
       case AkkaFailure(e) =>
         Future.failed(e)
-    }(ThreadUtils.sameThread).mapTo[T]
+    }(ThreadUtils.sameThread).mapTo[T].
+    recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
   }
 
   override def toString: String = s"${getClass.getSimpleName}($actorRef)"

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a7cf0c2..6841fa8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.unsafe.memory.TaskMemoryManager
 import org.apache.spark.util._
@@ -188,7 +189,7 @@ class DAGScheduler(
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
     blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
-      BlockManagerHeartbeat(blockManagerId), 600 seconds)
+      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
   }
 
   // Called by TaskScheduler when an executor fails.

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 190ff61..bc67abb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
   private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
     YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
 
-  private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
+  private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
   /**
    * Request executors from the ApplicationMaster by specifying the total number desired.

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 7cdae22..f70f701 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -33,7 +33,7 @@ class BlockManagerMaster(
     isDriver: Boolean)
   extends Logging {
 
-  val timeout = RpcUtils.askTimeout(conf)
+  val timeout = RpcUtils.askRpcTimeout(conf)
 
   /** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
   def removeExecutor(execId: String) {
@@ -106,7 +106,7 @@ class BlockManagerMaster(
         logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
     }(ThreadUtils.sameThread)
     if (blocking) {
-      Await.result(future, timeout)
+      timeout.awaitResult(future)
     }
   }
 
@@ -118,7 +118,7 @@ class BlockManagerMaster(
         logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
     }(ThreadUtils.sameThread)
     if (blocking) {
-      Await.result(future, timeout)
+      timeout.awaitResult(future)
     }
   }
 
@@ -132,7 +132,7 @@ class BlockManagerMaster(
           s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
     }(ThreadUtils.sameThread)
     if (blocking) {
-      Await.result(future, timeout)
+      timeout.awaitResult(future)
     }
   }
 
@@ -176,8 +176,8 @@ class BlockManagerMaster(
         CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
         Option[BlockStatus],
         Iterable[Option[BlockStatus]]]]
-    val blockStatus = Await.result(
-      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
+    val blockStatus = timeout.awaitResult(
+      Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread))
     if (blockStatus == null) {
       throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
     }
@@ -199,7 +199,7 @@ class BlockManagerMaster(
       askSlaves: Boolean): Seq[BlockId] = {
     val msg = GetMatchingBlockIds(filter, askSlaves)
     val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
-    Await.result(future, timeout)
+    timeout.awaitResult(future)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 96aa2fe..c179833 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.util
 
 import scala.collection.JavaConversions.mapAsJavaMap
-import scala.concurrent.Await
-import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
 import akka.pattern.ask
@@ -28,6 +26,7 @@ import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}
+import org.apache.spark.rpc.RpcTimeout
 
 /**
  * Various utility classes for working with Akka.
@@ -147,7 +146,7 @@ private[spark] object AkkaUtils extends Logging {
   def askWithReply[T](
       message: Any,
       actor: ActorRef,
-      timeout: FiniteDuration): T = {
+      timeout: RpcTimeout): T = {
     askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
   }
 
@@ -160,7 +159,7 @@ private[spark] object AkkaUtils extends Logging {
       actor: ActorRef,
       maxAttempts: Int,
       retryInterval: Long,
-      timeout: FiniteDuration): T = {
+      timeout: RpcTimeout): T = {
     // TODO: Consider removing multiple attempts
     if (actor == null) {
       throw new SparkException(s"Error sending message [message = $message]" +
@@ -171,8 +170,8 @@ private[spark] object AkkaUtils extends Logging {
     while (attempts < maxAttempts) {
       attempts += 1
       try {
-        val future = actor.ask(message)(timeout)
-        val result = Await.result(future, timeout)
+        val future = actor.ask(message)(timeout.duration)
+        val result = timeout.awaitResult(future)
         if (result == null) {
           throw new SparkException("Actor returned null")
         }
@@ -198,9 +197,9 @@ private[spark] object AkkaUtils extends Logging {
     val driverPort: Int = conf.getInt("spark.driver.port", 7077)
     Utils.checkHost(driverHost, "Expected hostname")
     val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
-    val timeout = RpcUtils.lookupTimeout(conf)
+    val timeout = RpcUtils.lookupRpcTimeout(conf)
     logInfo(s"Connecting to $name: $url")
-    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+    timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
   }
 
   def makeExecutorRef(
@@ -212,9 +211,9 @@ private[spark] object AkkaUtils extends Logging {
     val executorActorSystemName = SparkEnv.executorActorSystemName
     Utils.checkHost(host, "Expected hostname")
     val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
-    val timeout = RpcUtils.lookupTimeout(conf)
+    val timeout = RpcUtils.lookupRpcTimeout(conf)
     logInfo(s"Connecting to $name: $url")
-    Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
+    timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
   }
 
   def protocol(actorSystem: ActorSystem): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index f16cc8e..7578a3b 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.util
 
-import scala.concurrent.duration._
+import scala.concurrent.duration.FiniteDuration
 import scala.language.postfixOps
 
 import org.apache.spark.{SparkEnv, SparkConf}
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
 
 object RpcUtils {
 
@@ -47,14 +47,22 @@ object RpcUtils {
   }
 
   /** Returns the default Spark timeout to use for RPC ask operations. */
+  private[spark] def askRpcTimeout(conf: SparkConf): RpcTimeout = {
+    RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
+  }
+
+  @deprecated("use askRpcTimeout instead, this method was not intended to be public", "1.5.0")
   def askTimeout(conf: SparkConf): FiniteDuration = {
-    conf.getTimeAsSeconds("spark.rpc.askTimeout",
-      conf.get("spark.network.timeout", "120s")) seconds
+    askRpcTimeout(conf).duration
   }
 
   /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
+  private[spark] def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
+    RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
+  }
+
+  @deprecated("use lookupRpcTimeout instead, this method was not intended to be public", "1.5.0")
   def lookupTimeout(conf: SparkConf): FiniteDuration = {
-    conf.getTimeAsSeconds("spark.rpc.lookupTimeout",
-      conf.get("spark.network.timeout", "120s")) seconds
+    lookupRpcTimeout(conf).duration
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 9fbaeb3..90cb7da 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -260,10 +260,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     assert(RpcUtils.retryWaitMs(conf) === 2L)
 
     conf.set("spark.akka.askTimeout", "3")
-    assert(RpcUtils.askTimeout(conf) === (3 seconds))
+    assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
 
     conf.set("spark.akka.lookupTimeout", "4")
-    assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+    assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aa7bbc14/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 1f0aa75..6ceafe4 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,16 +155,21 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     })
 
     val conf = new SparkConf()
+    val shortProp = "spark.rpc.short.timeout"
     conf.set("spark.rpc.retry.wait", "0")
     conf.set("spark.rpc.numRetries", "1")
     val anotherEnv = createRpcEnv(conf, "remote", 13345)
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
     try {
-      val e = intercept[Exception] {
-        rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
+      // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause
+      val e = intercept[SparkException] {
+        rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp))
       }
-      assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
+      // The SparkException cause should be a RpcTimeoutException with message indicating the
+      // controlling timeout property
+      assert(e.getCause.isInstanceOf[RpcTimeoutException])
+      assert(e.getCause.getMessage.contains(shortProp))
     } finally {
       anotherEnv.shutdown()
       anotherEnv.awaitTermination()
@@ -539,6 +544,92 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     }
   }
 
+  test("construct RpcTimeout with conf property") {
+    val conf = new SparkConf
+
+    val testProp = "spark.ask.test.timeout"
+    val testDurationSeconds = 30
+    val secondaryProp = "spark.ask.secondary.timeout"
+
+    conf.set(testProp, s"${testDurationSeconds}s")
+    conf.set(secondaryProp, "100s")
+
+    // Construct RpcTimeout with a single property
+    val rt1 = RpcTimeout(conf, testProp)
+    assert( testDurationSeconds === rt1.duration.toSeconds )
+
+    // Construct RpcTimeout with prioritized list of properties
+    val rt2 = RpcTimeout(conf, Seq("spark.ask.invalid.timeout", testProp, secondaryProp), "1s")
+    assert( testDurationSeconds === rt2.duration.toSeconds )
+
+    // Construct RpcTimeout with default value,
+    val defaultProp = "spark.ask.default.timeout"
+    val defaultDurationSeconds = 1
+    val rt3 = RpcTimeout(conf, Seq(defaultProp), defaultDurationSeconds.toString + "s")
+    assert( defaultDurationSeconds === rt3.duration.toSeconds )
+    assert( rt3.timeoutProp.contains(defaultProp) )
+
+    // Try to construct RpcTimeout with an unconfigured property
+    intercept[NoSuchElementException] {
+      RpcTimeout(conf, "spark.ask.invalid.timeout")
+    }
+  }
+
+  test("ask a message timeout on Future using RpcTimeout") {
+    case class NeverReply(msg: String)
+
+    val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint {
+      override val rpcEnv = env
+
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+        case msg: String => context.reply(msg)
+        case _: NeverReply =>
+      }
+    })
+
+    val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
+    val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
+
+    // Ask with immediate response, should complete successfully
+    val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
+    val reply1 = longTimeout.awaitResult(fut1)
+    assert("hello" === reply1)
+
+    // Ask with a delayed response and wait for response immediately that should timeout
+    val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
+    val reply2 =
+      intercept[RpcTimeoutException] {
+        shortTimeout.awaitResult(fut2)
+      }.getMessage
+
+    // RpcTimeout.awaitResult should have added the property to the TimeoutException message
+    assert(reply2.contains(shortTimeout.timeoutProp))
+
+    // Ask with delayed response and allow the Future to timeout before Await.result
+    val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
+
+    // Allow future to complete with failure using plain Await.result, this will return
+    // once the future is complete to verify addMessageIfTimeout was invoked
+    val reply3 =
+      intercept[RpcTimeoutException] {
+        Await.result(fut3, 200 millis)
+      }.getMessage
+
+    // When the future timed out, the recover callback should have used
+    // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
+    assert(reply3.contains(shortTimeout.timeoutProp))
+
+    // Use RpcTimeout.awaitResult to process Future, since it has already failed with
+    // RpcTimeoutException, the same RpcTimeoutException should be thrown
+    val reply4 =
+      intercept[RpcTimeoutException] {
+        shortTimeout.awaitResult(fut3)
+      }.getMessage
+
+    // Ensure description is not in message twice after addMessageIfTimeout and awaitResult
+    assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1)
+  }
+
 }
 
 class UnserializableClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org