You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/19 12:34:11 UTC

spark git commit: [SPARK-19450] Replace askWithRetry with askSync.

Repository: spark
Updated Branches:
  refs/heads/master df3cbe3a3 -> ba8912e5f


[SPARK-19450] Replace askWithRetry with askSync.

## What changes were proposed in this pull request?

`askSync` is already added in `RpcEndpointRef` (see SPARK-19347 and https://github.com/apache/spark/pull/16690#issuecomment-276850068) and `askWithRetry` is marked as deprecated.
As mentioned SPARK-18113(https://github.com/apache/spark/pull/16503#event-927953218):

>askWithRetry is basically an unneeded API, and a leftover from the akka days that doesn't make sense anymore. It's prone to cause deadlocks (exactly because it's blocking), it imposes restrictions on the caller (e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

Since `askWithRetry` is just used inside spark and not in user logic. It might make sense to replace all of them with `askSync`.

## How was this patch tested?
This PR doesn't change code logic, existing unit test can cover.

Author: jinxing <ji...@meituan.com>

Closes #16790 from jinxing64/SPARK-19450.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba8912e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba8912e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba8912e5

Branch: refs/heads/master
Commit: ba8912e5f3d5c5a366cb3d1f6be91f2471d048d2
Parents: df3cbe3
Author: jinxing <ji...@meituan.com>
Authored: Sun Feb 19 04:34:07 2017 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Feb 19 04:34:07 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  2 +-
 .../deploy/master/ui/ApplicationPage.scala      |  2 +-
 .../spark/deploy/master/ui/MasterPage.scala     |  2 +-
 .../deploy/rest/StandaloneRestServer.scala      |  6 +-
 .../spark/deploy/worker/ui/WorkerPage.scala     |  4 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  2 +-
 .../org/apache/spark/executor/Executor.scala    |  2 +-
 .../org/apache/spark/rpc/RpcEndpointRef.scala   | 60 --------------------
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |  4 +-
 .../spark/storage/BlockManagerMaster.scala      | 32 +++++------
 .../StandaloneDynamicAllocationSuite.scala      |  4 +-
 .../spark/deploy/client/AppClientSuite.scala    |  2 +-
 .../spark/deploy/master/MasterSuite.scala       |  2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      | 21 ++++---
 .../spark/storage/BlockManagerSuite.scala       |  2 +-
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  2 +-
 .../spark/deploy/yarn/YarnAllocator.scala       |  2 +-
 .../streaming/state/StateStoreCoordinator.scala |  8 +--
 .../receiver/ReceiverSupervisorImpl.scala       |  4 +-
 .../streaming/scheduler/ReceiverTracker.scala   |  6 +-
 24 files changed, 58 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 4ca442b..4ef6656 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    */
   protected def askTracker[T: ClassTag](message: Any): T = {
     try {
-      trackerEndpoint.askWithRetry[T](message)
+      trackerEndpoint.askSync[T](message)
     } catch {
       case e: Exception =>
         logError("Error communicating with MapOutputTracker", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7e56406..e4d8389 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -605,7 +605,7 @@ class SparkContext(config: SparkConf) extends Logging {
         Some(Utils.getThreadDump())
       } else {
         val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get
-        Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
+        Some(endpointRef.askSync[Array[ThreadStackTrace]](TriggerThreadDump))
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index a4de3d7..bf60932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -123,7 +123,7 @@ private class ClientEndpoint(
     Thread.sleep(5000)
     logInfo("... polling master for driver state")
     val statusResponse =
-      activeMasterEndpoint.askWithRetry[DriverStatusResponse](RequestDriverStatus(driverId))
+      activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
     if (statusResponse.found) {
       logInfo(s"State of $driverId is ${statusResponse.state.get}")
       // Worker node, if present

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c5f7c07..816bf37 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -1045,7 +1045,7 @@ private[deploy] object Master extends Logging {
     val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
     val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
       new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
-    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
+    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
     (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 18cff31..946a928 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -34,7 +34,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
   /** Executor details for a particular application */
   def render(request: HttpServletRequest): Seq[Node] = {
     val appId = request.getParameter("appId")
-    val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
+    val state = master.askSync[MasterStateResponse](RequestMasterState)
     val app = state.activeApps.find(_.id == appId)
       .getOrElse(state.completedApps.find(_.id == appId).orNull)
     if (app == null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index ebbbbd3..7dbe329 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -33,7 +33,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
   private val master = parent.masterEndpointRef
 
   def getMasterState: MasterStateResponse = {
-    master.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.askSync[MasterStateResponse](RequestMasterState)
   }
 
   override def renderJson(request: HttpServletRequest): JValue = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index c19296c..5662006 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterEndpoint: RpcEndpointRef,
   extends KillRequestServlet {
 
   protected def handleKill(submissionId: String): KillSubmissionResponse = {
-    val response = masterEndpoint.askWithRetry[DeployMessages.KillDriverResponse](
+    val response = masterEndpoint.askSync[DeployMessages.KillDriverResponse](
       DeployMessages.RequestKillDriver(submissionId))
     val k = new KillSubmissionResponse
     k.serverSparkVersion = sparkVersion
@@ -89,7 +89,7 @@ private[rest] class StandaloneStatusRequestServlet(masterEndpoint: RpcEndpointRe
   extends StatusRequestServlet {
 
   protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
-    val response = masterEndpoint.askWithRetry[DeployMessages.DriverStatusResponse](
+    val response = masterEndpoint.askSync[DeployMessages.DriverStatusResponse](
       DeployMessages.RequestDriverStatus(submissionId))
     val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
     val d = new SubmissionStatusResponse
@@ -174,7 +174,7 @@ private[rest] class StandaloneSubmitRequestServlet(
     requestMessage match {
       case submitRequest: CreateSubmissionRequest =>
         val driverDescription = buildDriverDescription(submitRequest)
-        val response = masterEndpoint.askWithRetry[DeployMessages.SubmitDriverResponse](
+        val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
           DeployMessages.RequestSubmitDriver(driverDescription))
         val submitResponse = new CreateSubmissionResponse
         submitResponse.serverSparkVersion = sparkVersion

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
index 8ebcbcb..1ad9731 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
@@ -34,12 +34,12 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
   private val workerEndpoint = parent.worker.self
 
   override def renderJson(request: HttpServletRequest): JValue = {
-    val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
+    val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
     JsonProtocol.writeWorkerState(workerState)
   }
 
   def render(request: HttpServletRequest): Seq[Node] = {
-    val workerState = workerEndpoint.askWithRetry[WorkerStateResponse](RequestWorkerState)
+    val workerState = workerEndpoint.askSync[WorkerStateResponse](RequestWorkerState)
 
     val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
     val runningExecutors = workerState.executors

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4a38560..b376ecd 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         new SecurityManager(executorConf),
         clientMode = true)
       val driver = fetcher.setupEndpointRefByURI(driverUrl)
-      val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
+      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
       val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
       fetcher.shutdown()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index db5d0d8..d762f11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -677,7 +677,7 @@ private[spark] class Executor(
 
     val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
     try {
-      val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
+      val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
           message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
       if (response.reregisterBlockManager) {
         logInfo("Told to re-register on heartbeat")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index a577887..4d39f14 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -92,64 +92,4 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf)
     timeout.awaitResult(future)
   }
 
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
-   * default timeout, throw a SparkException if this fails even after the default number of retries.
-   * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
-   * method retries, the message handling in the receiver side should be idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time,  so don't call it in a message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  @deprecated("use 'askSync' instead.", "2.2.0")
-  def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
-   * specified timeout, throw a SparkException if this fails even after the specified number of
-   * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
-   * retries, the message handling in the receiver side should be idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @param timeout the timeout duration
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  @deprecated("use 'askSync' instead.", "2.2.0")
-  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
-    // TODO: Consider removing multiple attempts
-    var attempts = 0
-    var lastException: Exception = null
-    while (attempts < maxRetries) {
-      attempts += 1
-      try {
-        val future = ask[T](message, timeout)
-        val result = timeout.awaitResult(future)
-        if (result == null) {
-          throw new SparkException("RpcEndpoint returned null")
-        }
-        return result
-      } catch {
-        case ie: InterruptedException => throw ie
-        case e: Exception =>
-          lastException = e
-          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
-      }
-
-      if (attempts < maxRetries) {
-        Thread.sleep(retryWaitMs)
-      }
-    }
-
-    throw new SparkException(
-      s"Error sending message [message = $message]", lastException)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0b7d371..692ed80 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -232,7 +232,7 @@ class DAGScheduler(
       accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
-    blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
+    blockManagerMaster.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e006cc9..94abe30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -372,7 +372,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     try {
       if (driverEndpoint != null) {
         logInfo("Shutting down all executors")
-        driverEndpoint.askWithRetry[Boolean](StopExecutors)
+        driverEndpoint.askSync[Boolean](StopExecutors)
       }
     } catch {
       case e: Exception =>
@@ -384,7 +384,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     stopExecutors()
     try {
       if (driverEndpoint != null) {
-        driverEndpoint.askWithRetry[Boolean](StopDriver)
+        driverEndpoint.askSync[Boolean](StopDriver)
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 7a60006..3ca690d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -60,7 +60,7 @@ class BlockManagerMaster(
       maxMemSize: Long,
       slaveEndpoint: RpcEndpointRef): BlockManagerId = {
     logInfo(s"Registering BlockManager $blockManagerId")
-    val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
+    val updatedId = driverEndpoint.askSync[BlockManagerId](
       RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
     logInfo(s"Registered BlockManager $updatedId")
     updatedId
@@ -72,7 +72,7 @@ class BlockManagerMaster(
       storageLevel: StorageLevel,
       memSize: Long,
       diskSize: Long): Boolean = {
-    val res = driverEndpoint.askWithRetry[Boolean](
+    val res = driverEndpoint.askSync[Boolean](
       UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
     logDebug(s"Updated info of block $blockId")
     res
@@ -80,12 +80,12 @@ class BlockManagerMaster(
 
   /** Get locations of the blockId from the driver */
   def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
+    driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))
   }
 
   /** Get locations of multiple blockIds from the driver */
   def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
-    driverEndpoint.askWithRetry[IndexedSeq[Seq[BlockManagerId]]](
+    driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
       GetLocationsMultipleBlockIds(blockIds))
   }
 
@@ -99,11 +99,11 @@ class BlockManagerMaster(
 
   /** Get ids of other nodes in the cluster from the driver */
   def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
+    driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
   def getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef] = {
-    driverEndpoint.askWithRetry[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
+    driverEndpoint.askSync[Option[RpcEndpointRef]](GetExecutorEndpointRef(executorId))
   }
 
   /**
@@ -111,12 +111,12 @@ class BlockManagerMaster(
    * blocks that the driver knows about.
    */
   def removeBlock(blockId: BlockId) {
-    driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
+    driverEndpoint.askSync[Boolean](RemoveBlock(blockId))
   }
 
   /** Remove all blocks belonging to the given RDD. */
   def removeRdd(rddId: Int, blocking: Boolean) {
-    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
+    val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e)
@@ -128,7 +128,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given shuffle. */
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
-    val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
+    val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e)
@@ -140,7 +140,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given broadcast. */
   def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
-    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](
+    val future = driverEndpoint.askSync[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
     future.onFailure {
       case e: Exception =>
@@ -159,11 +159,11 @@ class BlockManagerMaster(
    * amount of remaining memory.
    */
   def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+    driverEndpoint.askSync[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
   }
 
   def getStorageStatus: Array[StorageStatus] = {
-    driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
+    driverEndpoint.askSync[Array[StorageStatus]](GetStorageStatus)
   }
 
   /**
@@ -184,7 +184,7 @@ class BlockManagerMaster(
      * master endpoint for a response to a prior message.
      */
     val response = driverEndpoint.
-      askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
+      askSync[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
     val (blockManagerIds, futures) = response.unzip
     implicit val sameThread = ThreadUtils.sameThread
     val cbf =
@@ -214,7 +214,7 @@ class BlockManagerMaster(
       filter: BlockId => Boolean,
       askSlaves: Boolean): Seq[BlockId] = {
     val msg = GetMatchingBlockIds(filter, askSlaves)
-    val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
+    val future = driverEndpoint.askSync[Future[Seq[BlockId]]](msg)
     timeout.awaitResult(future)
   }
 
@@ -223,7 +223,7 @@ class BlockManagerMaster(
    * since they are not reported the master.
    */
   def hasCachedBlocks(executorId: String): Boolean = {
-    driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
+    driverEndpoint.askSync[Boolean](HasCachedBlocks(executorId))
   }
 
   /** Stop the driver endpoint, called only on the Spark driver node */
@@ -237,7 +237,7 @@ class BlockManagerMaster(
 
   /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
   private def tell(message: Any) {
-    if (!driverEndpoint.askWithRetry[Boolean](message)) {
+    if (!driverEndpoint.askSync[Boolean](message)) {
       throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 54ea727..9839dcf 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -547,7 +547,7 @@ class StandaloneDynamicAllocationSuite
 
   /** Get the Master state */
   private def getMasterState: MasterStateResponse = {
-    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.self.askSync[MasterStateResponse](RequestMasterState)
   }
 
   /** Get the applications that are active from Master */
@@ -620,7 +620,7 @@ class StandaloneDynamicAllocationSuite
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty)
       val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
-      backend.driverEndpoint.askWithRetry[Boolean](message)
+      backend.driverEndpoint.askSync[Boolean](message)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index bc58fb2..936639b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -171,7 +171,7 @@ class AppClientSuite
 
   /** Get the Master state */
   private def getMasterState: MasterStateResponse = {
-    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+    master.self.askSync[MasterStateResponse](RequestMasterState)
   }
 
   /** Get the applications that are active from Master */

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index da7253b..2127da4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -432,7 +432,7 @@ class MasterSuite extends SparkFunSuite
     val master = makeMaster()
     master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
     eventually(timeout(10.seconds)) {
-      val masterState = master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
       assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index b4037d7..31d9dd3 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -118,8 +118,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       }
     }
     val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint)
-    val newRpcEndpointRef = rpcEndpointRef.askWithRetry[RpcEndpointRef]("Hello")
-    val reply = newRpcEndpointRef.askWithRetry[String]("Echo")
+    val newRpcEndpointRef = rpcEndpointRef.askSync[RpcEndpointRef]("Hello")
+    val reply = newRpcEndpointRef.askSync[String]("Echo")
     assert("Echo" === reply)
   }
 
@@ -132,7 +132,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
           context.reply(msg)
       }
     })
-    val reply = rpcEndpointRef.askWithRetry[String]("hello")
+    val reply = rpcEndpointRef.askSync[String]("hello")
     assert("hello" === reply)
   }
 
@@ -150,7 +150,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-remotely")
     try {
-      val reply = rpcEndpointRef.askWithRetry[String]("hello")
+      val reply = rpcEndpointRef.askSync[String]("hello")
       assert("hello" === reply)
     } finally {
       anotherEnv.shutdown()
@@ -177,14 +177,13 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
     try {
-      // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause
-      val e = intercept[SparkException] {
-        rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp))
+      val e = intercept[RpcTimeoutException] {
+        rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, shortProp))
       }
       // The SparkException cause should be a RpcTimeoutException with message indicating the
       // controlling timeout property
-      assert(e.getCause.isInstanceOf[RpcTimeoutException])
-      assert(e.getCause.getMessage.contains(shortProp))
+      assert(e.isInstanceOf[RpcTimeoutException])
+      assert(e.getMessage.contains(shortProp))
     } finally {
       anotherEnv.shutdown()
       anotherEnv.awaitTermination()
@@ -677,7 +676,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
         }
       })
       val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication")
-      val reply = rpcEndpointRef.askWithRetry[String]("hello")
+      val reply = rpcEndpointRef.askSync[String]("hello")
       assert("hello" === reply)
     } finally {
       localEnv.shutdown()
@@ -894,7 +893,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     val ref = anotherEnv.setupEndpointRef(env.address, "SPARK-14699")
     // Make sure the connect is set up
-    assert(ref.askWithRetry[String]("hello") === "hello")
+    assert(ref.askSync[String]("hello") === "hello")
     anotherEnv.shutdown()
     anotherEnv.awaitTermination()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 705c355..64a67b4 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -394,7 +394,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    val reregister = !master.driverEndpoint.askWithRetry[Boolean](
+    val reregister = !master.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(store.blockManagerId))
     assert(reregister == true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a674da4..cdb3b68 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -442,7 +442,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.stop()
     // Any method of the backend involving sending messages to the driver endpoint should not
     // be called after the backend is stopped.
-    verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+    verify(driverEndpoint, never()).askSync(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
   }
 
   test("mesos supports spark.executor.uri") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 8a76dbd..abd2de7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -101,7 +101,7 @@ private[yarn] class YarnAllocator(
    * @see SPARK-12864
    */
   private var executorIdCounter: Int =
-    driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+    driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
 
   // Queue to store the timestamp of failed executors
   private val failedExecutorsTimeStamps = new Queue[Long]()

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 267d176..d0f8188 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -88,21 +88,21 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
 
   /** Verify whether the given executor has the active instance of a state store */
   private[state] def verifyIfInstanceActive(storeId: StateStoreId, executorId: String): Boolean = {
-    rpcEndpointRef.askWithRetry[Boolean](VerifyIfInstanceActive(storeId, executorId))
+    rpcEndpointRef.askSync[Boolean](VerifyIfInstanceActive(storeId, executorId))
   }
 
   /** Get the location of the state store */
   private[state] def getLocation(storeId: StateStoreId): Option[String] = {
-    rpcEndpointRef.askWithRetry[Option[String]](GetLocation(storeId))
+    rpcEndpointRef.askSync[Option[String]](GetLocation(storeId))
   }
 
   /** Deactivate instances related to a set of operator */
   private[state] def deactivateInstances(storeRootLocation: String): Unit = {
-    rpcEndpointRef.askWithRetry[Boolean](DeactivateInstances(storeRootLocation))
+    rpcEndpointRef.askSync[Boolean](DeactivateInstances(storeRootLocation))
   }
 
   private[state] def stop(): Unit = {
-    rpcEndpointRef.askWithRetry[Boolean](StopCoordinator)
+    rpcEndpointRef.askSync[Boolean](StopCoordinator)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 722024b..f5c8a88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -188,13 +188,13 @@ private[streaming] class ReceiverSupervisorImpl(
   override protected def onReceiverStart(): Boolean = {
     val msg = RegisterReceiver(
       streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
-    trackerEndpoint.askWithRetry[Boolean](msg)
+    trackerEndpoint.askSync[Boolean](msg)
   }
 
   override protected def onReceiverStop(message: String, error: Option[Throwable]) {
     logInfo("Deregistering receiver " + streamId)
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
-    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
+    trackerEndpoint.askSync[Boolean](DeregisterReceiver(streamId, message, errorString))
     logInfo("Stopped receiver " + streamId)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8912e5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 8f55d98..bd7ab0b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -170,7 +170,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
       trackerState = Stopping
       if (!skipReceiverLaunch) {
         // Send the stop signal to all the receivers
-        endpoint.askWithRetry[Boolean](StopAllReceivers)
+        endpoint.askSync[Boolean](StopAllReceivers)
 
         // Wait for the Spark job that runs the receivers to be over
         // That is, for the receivers to quit gracefully.
@@ -183,7 +183,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         }
 
         // Check if all the receivers have been deregistered or not
-        val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
+        val receivers = endpoint.askSync[Seq[Int]](AllReceiverIds)
         if (receivers.nonEmpty) {
           logWarning("Not all of the receivers have deregistered, " + receivers)
         } else {
@@ -249,7 +249,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
    */
   def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
     if (isTrackerStarted) {
-      endpoint.askWithRetry[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+      endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
         _.runningExecutor.map {
           _.executorId
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org