You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/29 18:46:41 UTC

spark git commit: [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.

Repository: spark
Updated Branches:
  refs/heads/master baed3f2c7 -> 687273d91


[SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.

The old naming scheme was very confusing between askWithReply and sendWithReply. I also divided RpcEnv.scala into multiple files.

Author: Reynold Xin <rx...@databricks.com>

Closes #5768 from rxin/rpc-rename and squashes the following commits:

a84058e [Reynold Xin] [SPARK-7223] Rename RPC askWithReply -> askWithReply, sendWithReply -> ask.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/687273d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/687273d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/687273d9

Branch: refs/heads/master
Commit: 687273d9150e1c89a74aa1473f0c6495f56509af
Parents: baed3f2
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Apr 29 09:46:37 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 29 09:46:37 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |   2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   4 +-
 .../org/apache/spark/executor/Executor.scala    |   2 +-
 .../org/apache/spark/rpc/RpcCallContext.scala   |  41 +++
 .../org/apache/spark/rpc/RpcEndpoint.scala      | 148 +++++++++
 .../org/apache/spark/rpc/RpcEndpointRef.scala   | 119 +++++++
 .../scala/org/apache/spark/rpc/RpcEnv.scala     | 313 ++-----------------
 .../org/apache/spark/rpc/akka/AkkaRpcEnv.scala  |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   2 +-
 .../scheduler/OutputCommitCoordinator.scala     |   2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   6 +-
 .../cluster/YarnSchedulerBackend.scala          |   8 +-
 .../spark/scheduler/local/LocalBackend.scala    |   2 +-
 .../spark/storage/BlockManagerMaster.scala      |  28 +-
 .../storage/BlockManagerMasterEndpoint.scala    |  12 +-
 .../apache/spark/HeartbeatReceiverSuite.scala   |   4 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      |  20 +-
 .../spark/storage/BlockManagerSuite.scala       |   2 +-
 .../receiver/ReceiverSupervisorImpl.scala       |   6 +-
 20 files changed, 394 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index d65c94e..1607228 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -106,7 +106,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
    */
   protected def askTracker[T: ClassTag](message: Any): T = {
     try {
-      trackerEndpoint.askWithReply[T](message)
+      trackerEndpoint.askWithRetry[T](message)
     } catch {
       case e: Exception =>
         logError("Error communicating with MapOutputTracker", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d0cf2a8..5ae8fb8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -555,7 +555,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           SparkEnv.executorActorSystemName,
           RpcAddress(host, port),
           ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
-        Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
+        Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
       }
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8af46f3..79aed90 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -57,7 +57,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     logInfo("Connecting to driver: " + driverUrl)
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       driver = Some(ref)
-      ref.sendWithReply[RegisteredExecutor.type](
+      ref.ask[RegisteredExecutor.type](
         RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
     } onComplete {
       case Success(msg) => Utils.tryLogNonFatalError {
@@ -154,7 +154,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
         executorConf,
         new SecurityManager(executorConf))
       val driver = fetcher.setupEndpointRefByURI(driverUrl)
-      val props = driver.askWithReply[Seq[(String, String)]](RetrieveSparkProps) ++
+      val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
         Seq[(String, String)](("spark.app.id", appId))
       fetcher.shutdown()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd1c48e..8f916e0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -441,7 +441,7 @@ private[spark] class Executor(
 
     val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
     try {
-      val response = heartbeatReceiverRef.askWithReply[HeartbeatResponse](message)
+      val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message)
       if (response.reregisterBlockManager) {
         logWarning("Told to re-register on heartbeat")
         env.blockManager.reregister()

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala b/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala
new file mode 100644
index 0000000..3e5b642
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcCallContext.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+/**
+ * A callback that [[RpcEndpoint]] can use it to send back a message or failure. It's thread-safe
+ * and can be called in any thread.
+ */
+private[spark] trait RpcCallContext {
+
+  /**
+   * Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
+   * will be called.
+   */
+  def reply(response: Any): Unit
+
+  /**
+   * Report a failure to the sender.
+   */
+  def sendFailure(e: Throwable): Unit
+
+  /**
+   * The sender of this message.
+   */
+  def sender: RpcEndpointRef
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
new file mode 100644
index 0000000..d2b2bae
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import org.apache.spark.SparkException
+
+/**
+ * A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be
+ * created using Reflection.
+ */
+private[spark] trait RpcEnvFactory {
+
+  def create(config: RpcEnvConfig): RpcEnv
+}
+
+/**
+ * A trait that requires RpcEnv thread-safely sending messages to it.
+ *
+ * Thread-safety means processing of one message happens before processing of the next message by
+ * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
+ * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
+ * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
+ *
+ * However, there is no guarantee that the same thread will be executing the same
+ * [[ThreadSafeRpcEndpoint]] for different messages.
+ */
+private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
+
+
+/**
+ * An end point for the RPC that defines what functions to trigger given a message.
+ *
+ * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
+ *
+ * The lift-cycle will be:
+ *
+ * constructor onStart receive* onStop
+ *
+ * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
+ * [[ThreadSafeRpcEndpoint]]
+ *
+ * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
+ * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
+ */
+private[spark] trait RpcEndpoint {
+
+  /**
+   * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
+   */
+  val rpcEnv: RpcEnv
+
+  /**
+   * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
+   * called. And `self` will become `null` when `onStop` is called.
+   *
+   * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
+   * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
+   */
+  final def self: RpcEndpointRef = {
+    require(rpcEnv != null, "rpcEnv has not been initialized")
+    rpcEnv.endpointRef(this)
+  }
+
+  /**
+   * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
+   * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
+   */
+  def receive: PartialFunction[Any, Unit] = {
+    case _ => throw new SparkException(self + " does not implement 'receive'")
+  }
+
+  /**
+   * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
+   * [[SparkException]] will be thrown and sent to `onError`.
+   */
+  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
+  }
+
+  /**
+   * Invoked when any exception is thrown during handling messages.
+   */
+  def onError(cause: Throwable): Unit = {
+    // By default, throw e and let RpcEnv handle it
+    throw cause
+  }
+
+  /**
+   * Invoked before [[RpcEndpoint]] starts to handle any message.
+   */
+  def onStart(): Unit = {
+    // By default, do nothing.
+  }
+
+  /**
+   * Invoked when [[RpcEndpoint]] is stopping.
+   */
+  def onStop(): Unit = {
+    // By default, do nothing.
+  }
+
+  /**
+   * Invoked when `remoteAddress` is connected to the current node.
+   */
+  def onConnected(remoteAddress: RpcAddress): Unit = {
+    // By default, do nothing.
+  }
+
+  /**
+   * Invoked when `remoteAddress` is lost.
+   */
+  def onDisconnected(remoteAddress: RpcAddress): Unit = {
+    // By default, do nothing.
+  }
+
+  /**
+   * Invoked when some network error happens in the connection between the current node and
+   * `remoteAddress`.
+   */
+  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
+    // By default, do nothing.
+  }
+
+  /**
+   * A convenient method to stop [[RpcEndpoint]].
+   */
+  final def stop(): Unit = {
+    val _self = self
+    if (_self != null) {
+      rpcEnv.stop(_self)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
new file mode 100644
index 0000000..69181ed
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rpc
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.FiniteDuration
+import scala.reflect.ClassTag
+
+import org.apache.spark.util.RpcUtils
+import org.apache.spark.{SparkException, Logging, SparkConf}
+
+/**
+ * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
+ */
+private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+  extends Serializable with Logging {
+
+  private[this] val maxRetries = RpcUtils.numRetries(conf)
+  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
+  private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
+
+  /**
+   * return the address for the [[RpcEndpointRef]]
+   */
+  def address: RpcAddress
+
+  def name: String
+
+  /**
+   * Sends a one-way asynchronous message. Fire-and-forget semantics.
+   */
+  def send(message: Any): Unit
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
+   * receive the reply within the specified timeout.
+   *
+   * This method only sends the message once and never retries.
+   */
+  def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
+   * receive the reply within a default timeout.
+   *
+   * This method only sends the message once and never retries.
+   */
+  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
+   * timeout, or throw a SparkException if this fails even after the default number of retries.
+   * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
+   * method retries, the message handling in the receiver side should be idempotent.
+   *
+   * Note: this is a blocking action which may cost a lot of time,  so don't call it in an message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)
+
+  /**
+   * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
+   * specified timeout, throw a SparkException if this fails even after the specified number of
+   * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
+   * retries, the message handling in the receiver side should be idempotent.
+   *
+   * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
+   * loop of [[RpcEndpoint]].
+   *
+   * @param message the message to send
+   * @param timeout the timeout duration
+   * @tparam T type of the reply message
+   * @return the reply message from the corresponding [[RpcEndpoint]]
+   */
+  def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
+    // TODO: Consider removing multiple attempts
+    var attempts = 0
+    var lastException: Exception = null
+    while (attempts < maxRetries) {
+      attempts += 1
+      try {
+        val future = ask[T](message, timeout)
+        val result = Await.result(future, timeout)
+        if (result == null) {
+          throw new SparkException("Actor returned null")
+        }
+        return result
+      } catch {
+        case ie: InterruptedException => throw ie
+        case e: Exception =>
+          lastException = e
+          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
+      }
+      Thread.sleep(retryWaitMs)
+    }
+
+    throw new SparkException(
+      s"Error sending message [message = $message]", lastException)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index a5336b7..12b6b28 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -20,13 +20,41 @@ package org.apache.spark.rpc
 import java.net.URI
 
 import scala.concurrent.{Await, Future}
-import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.reflect.ClassTag
 
-import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.util.{RpcUtils, Utils}
 
+
+/**
+ * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
+ * so that it can be created via Reflection.
+ */
+private[spark] object RpcEnv {
+
+  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
+    // Add more RpcEnv implementations here
+    val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
+    val rpcEnvName = conf.get("spark.rpc", "akka")
+    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
+    Class.forName(rpcEnvFactoryClassName, true, Utils.getContextOrSparkClassLoader).
+      newInstance().asInstanceOf[RpcEnvFactory]
+  }
+
+  def create(
+      name: String,
+      host: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager): RpcEnv = {
+    // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
+    val config = RpcEnvConfig(conf, name, host, port, securityManager)
+    getRpcEnvFactory(conf).create(config)
+  }
+
+}
+
+
 /**
  * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
  * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
@@ -112,6 +140,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
   def uriOf(systemName: String, address: RpcAddress, endpointName: String): String
 }
 
+
 private[spark] case class RpcEnvConfig(
     conf: SparkConf,
     name: String,
@@ -119,261 +148,9 @@ private[spark] case class RpcEnvConfig(
     port: Int,
     securityManager: SecurityManager)
 
-/**
- * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
- * so that it can be created via Reflection.
- */
-private[spark] object RpcEnv {
-
-  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
-    // Add more RpcEnv implementations here
-    val rpcEnvNames = Map("akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
-    val rpcEnvName = conf.get("spark.rpc", "akka")
-    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
-    Class.forName(rpcEnvFactoryClassName, true, Utils.getContextOrSparkClassLoader).
-      newInstance().asInstanceOf[RpcEnvFactory]
-  }
-
-  def create(
-      name: String,
-      host: String,
-      port: Int,
-      conf: SparkConf,
-      securityManager: SecurityManager): RpcEnv = {
-    // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
-    val config = RpcEnvConfig(conf, name, host, port, securityManager)
-    getRpcEnvFactory(conf).create(config)
-  }
-
-}
-
-/**
- * A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be
- * created using Reflection.
- */
-private[spark] trait RpcEnvFactory {
-
-  def create(config: RpcEnvConfig): RpcEnv
-}
 
 /**
- * An end point for the RPC that defines what functions to trigger given a message.
- *
- * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
- *
- * The lift-cycle will be:
- *
- * constructor onStart receive* onStop
- *
- * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
- * [[ThreadSafeRpcEndpoint]]
- *
- * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
- * invoked with the cause. If `onError` throws an error, [[RpcEnv]] will ignore it.
- */
-private[spark] trait RpcEndpoint {
-
-  /**
-   * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
-   */
-  val rpcEnv: RpcEnv
-
-  /**
-   * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
-   * called. And `self` will become `null` when `onStop` is called.
-   *
-   * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
-   * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
-   */
-  final def self: RpcEndpointRef = {
-    require(rpcEnv != null, "rpcEnv has not been initialized")
-    rpcEnv.endpointRef(this)
-  }
-
-  /**
-   * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
-   * unmatched message, [[SparkException]] will be thrown and sent to `onError`.
-   */
-  def receive: PartialFunction[Any, Unit] = {
-    case _ => throw new SparkException(self + " does not implement 'receive'")
-  }
-
-  /**
-   * Process messages from [[RpcEndpointRef.sendWithReply]]. If receiving a unmatched message,
-   * [[SparkException]] will be thrown and sent to `onError`.
-   */
-  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
-  }
-
-  /**
-   * Call onError when any exception is thrown during handling messages.
-   *
-   * @param cause
-   */
-  def onError(cause: Throwable): Unit = {
-    // By default, throw e and let RpcEnv handle it
-    throw cause
-  }
-
-  /**
-   * Invoked before [[RpcEndpoint]] starts to handle any message.
-   */
-  def onStart(): Unit = {
-    // By default, do nothing.
-  }
-
-  /**
-   * Invoked when [[RpcEndpoint]] is stopping.
-   */
-  def onStop(): Unit = {
-    // By default, do nothing.
-  }
-
-  /**
-   * Invoked when `remoteAddress` is connected to the current node.
-   */
-  def onConnected(remoteAddress: RpcAddress): Unit = {
-    // By default, do nothing.
-  }
-
-  /**
-   * Invoked when `remoteAddress` is lost.
-   */
-  def onDisconnected(remoteAddress: RpcAddress): Unit = {
-    // By default, do nothing.
-  }
-
-  /**
-   * Invoked when some network error happens in the connection between the current node and
-   * `remoteAddress`.
-   */
-  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
-    // By default, do nothing.
-  }
-
-  /**
-   * A convenient method to stop [[RpcEndpoint]].
-   */
-  final def stop(): Unit = {
-    val _self = self
-    if (_self != null) {
-      rpcEnv.stop(_self)
-    }
-  }
-}
-
-/**
- * A trait that requires RpcEnv thread-safely sending messages to it.
- *
- * Thread-safety means processing of one message happens before processing of the next message by
- * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
- * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
- * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
- *
- * However, there is no guarantee that the same thread will be executing the same
- * [[ThreadSafeRpcEndpoint]] for different messages.
- */
-trait ThreadSafeRpcEndpoint extends RpcEndpoint
-
-/**
- * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
- */
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
-  extends Serializable with Logging {
-
-  private[this] val maxRetries = RpcUtils.numRetries(conf)
-  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
-  private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
-
-  /**
-   * return the address for the [[RpcEndpointRef]]
-   */
-  def address: RpcAddress
-
-  def name: String
-
-  /**
-   * Sends a one-way asynchronous message. Fire-and-forget semantics.
-   */
-  def send(message: Any): Unit
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
-   * receive the reply within a default timeout.
-   *
-   * This method only sends the message once and never retries.
-   */
-  def sendWithReply[T: ClassTag](message: Any): Future[T] =
-    sendWithReply(message, defaultAskTimeout)
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to
-   * receive the reply within the specified timeout.
-   *
-   * This method only sends the message once and never retries.
-   */
-  def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
-   * timeout, or throw a SparkException if this fails even after the default number of retries.
-   * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
-   * method retries, the message handling in the receiver side should be idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time,  so don't call it in an message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultAskTimeout)
-
-  /**
-   * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
-   * specified timeout, throw a SparkException if this fails even after the specified number of
-   * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
-   * retries, the message handling in the receiver side should be idempotent.
-   *
-   * Note: this is a blocking action which may cost a lot of time, so don't call it in an message
-   * loop of [[RpcEndpoint]].
-   *
-   * @param message the message to send
-   * @param timeout the timeout duration
-   * @tparam T type of the reply message
-   * @return the reply message from the corresponding [[RpcEndpoint]]
-   */
-  def askWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
-    // TODO: Consider removing multiple attempts
-    var attempts = 0
-    var lastException: Exception = null
-    while (attempts < maxRetries) {
-      attempts += 1
-      try {
-        val future = sendWithReply[T](message, timeout)
-        val result = Await.result(future, timeout)
-        if (result == null) {
-          throw new SparkException("Actor returned null")
-        }
-        return result
-      } catch {
-        case ie: InterruptedException => throw ie
-        case e: Exception =>
-          lastException = e
-          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
-      }
-      Thread.sleep(retryWaitMs)
-    }
-
-    throw new SparkException(
-      s"Error sending message [message = $message]", lastException)
-  }
-
-}
-
-/**
- * Represent a host with a port
+ * Represents a host and port.
  */
 private[spark] case class RpcAddress(host: String, port: Int) {
   // TODO do we need to add the type of RpcEnv in the address?
@@ -383,6 +160,7 @@ private[spark] case class RpcAddress(host: String, port: Int) {
   override val toString: String = hostPort
 }
 
+
 private[spark] object RpcAddress {
 
   /**
@@ -404,26 +182,3 @@ private[spark] object RpcAddress {
     RpcAddress(host, port)
   }
 }
-
-/**
- * A callback that [[RpcEndpoint]] can use it to send back a message or failure. It's thread-safe
- * and can be called in any thread.
- */
-private[spark] trait RpcCallContext {
-
-  /**
-   * Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
-   * will be called.
-   */
-  def reply(response: Any): Unit
-
-  /**
-   * Report a failure to the sender.
-   */
-  def sendFailure(e: Throwable): Unit
-
-  /**
-   * The sender of this message.
-   */
-  def sender: RpcEndpointRef
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index 652e52f..ba0d468 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -293,7 +293,7 @@ private[akka] class AkkaRpcEndpointRef(
     actorRef ! AkkaMessage(message, false)
   }
 
-  override def sendWithReply[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
+  override def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T] = {
     import scala.concurrent.ExecutionContext.Implicits.global
     actorRef.ask(AkkaMessage(message, true))(timeout).flatMap {
       case msg @ AkkaMessage(message, reply) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b7901c0..b511c30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -167,7 +167,7 @@ class DAGScheduler(
       taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
       blockManagerId: BlockManagerId): Boolean = {
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
-    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
+    blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
       BlockManagerHeartbeat(blockManagerId), 600 seconds)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 7c184b1..0b1d47c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -85,7 +85,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
     val msg = AskPermissionToCommitOutput(stage, partition, attempt)
     coordinatorRef match {
       case Some(endpointRef) =>
-        endpointRef.askWithReply[Boolean](msg)
+        endpointRef.askWithRetry[Boolean](msg)
       case None =>
         logError(
           "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9656fb7..7352fa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -252,7 +252,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     try {
       if (driverEndpoint != null) {
         logInfo("Shutting down all executors")
-        driverEndpoint.askWithReply[Boolean](StopExecutors)
+        driverEndpoint.askWithRetry[Boolean](StopExecutors)
       }
     } catch {
       case e: Exception =>
@@ -264,7 +264,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     stopExecutors()
     try {
       if (driverEndpoint != null) {
-        driverEndpoint.askWithReply[Boolean](StopDriver)
+        driverEndpoint.askWithRetry[Boolean](StopDriver)
       }
     } catch {
       case e: Exception =>
@@ -287,7 +287,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Called by subclasses when notified of a lost worker
   def removeExecutor(executorId: String, reason: String) {
     try {
-      driverEndpoint.askWithReply[Boolean](RemoveExecutor(executorId, reason))
+      driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
     } catch {
       case e: Exception =>
         throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d987c7d..2a3a5d9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -53,14 +53,14 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    yarnSchedulerEndpoint.askWithReply[Boolean](RequestExecutors(requestedTotal))
+    yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
   }
 
   /**
    * Request that the ApplicationMaster kill the specified executors.
    */
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    yarnSchedulerEndpoint.askWithReply[Boolean](KillExecutors(executorIds))
+    yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
@@ -115,7 +115,7 @@ private[spark] abstract class YarnSchedulerBackend(
         amEndpoint match {
           case Some(am) =>
             Future {
-              context.reply(am.askWithReply[Boolean](r))
+              context.reply(am.askWithRetry[Boolean](r))
             } onFailure {
               case NonFatal(e) =>
                 logError(s"Sending $r to AM was unsuccessful", e)
@@ -130,7 +130,7 @@ private[spark] abstract class YarnSchedulerBackend(
         amEndpoint match {
           case Some(am) =>
             Future {
-              context.reply(am.askWithReply[Boolean](k))
+              context.reply(am.askWithRetry[Boolean](k))
             } onFailure {
               case NonFatal(e) =>
                 logError(s"Sending $k to AM was unsuccessful", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index ac5b524..e64d06c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -123,7 +123,7 @@ private[spark] class LocalBackend(
   }
 
   override def stop() {
-    localEndpoint.sendWithReply(StopExecutor)
+    localEndpoint.ask(StopExecutor)
   }
 
   override def reviveOffers() {

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index c798843..9bfc420 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -55,7 +55,7 @@ class BlockManagerMaster(
       memSize: Long,
       diskSize: Long,
       tachyonSize: Long): Boolean = {
-    val res = driverEndpoint.askWithReply[Boolean](
+    val res = driverEndpoint.askWithRetry[Boolean](
       UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
     logDebug(s"Updated info of block $blockId")
     res
@@ -63,12 +63,12 @@ class BlockManagerMaster(
 
   /** Get locations of the blockId from the driver */
   def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithReply[Seq[BlockManagerId]](GetLocations(blockId))
+    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
   }
 
   /** Get locations of multiple blockIds from the driver */
   def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
-    driverEndpoint.askWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
+    driverEndpoint.askWithRetry[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds))
   }
 
   /**
@@ -81,11 +81,11 @@ class BlockManagerMaster(
 
   /** Get ids of other nodes in the cluster from the driver */
   def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
-    driverEndpoint.askWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
+    driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
   def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
-    driverEndpoint.askWithReply[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
+    driverEndpoint.askWithRetry[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
   }
 
   /**
@@ -93,12 +93,12 @@ class BlockManagerMaster(
    * blocks that the driver knows about.
    */
   def removeBlock(blockId: BlockId) {
-    driverEndpoint.askWithReply[Boolean](RemoveBlock(blockId))
+    driverEndpoint.askWithRetry[Boolean](RemoveBlock(blockId))
   }
 
   /** Remove all blocks belonging to the given RDD. */
   def removeRdd(rddId: Int, blocking: Boolean) {
-    val future = driverEndpoint.askWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
+    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
@@ -110,7 +110,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given shuffle. */
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
-    val future = driverEndpoint.askWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
+    val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
       case e: Exception =>
         logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
@@ -122,7 +122,7 @@ class BlockManagerMaster(
 
   /** Remove all blocks belonging to the given broadcast. */
   def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
-    val future = driverEndpoint.askWithReply[Future[Seq[Int]]](
+    val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
     future.onFailure {
       case e: Exception =>
@@ -141,11 +141,11 @@ class BlockManagerMaster(
    * amount of remaining memory.
    */
   def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = {
-    driverEndpoint.askWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
+    driverEndpoint.askWithRetry[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus)
   }
 
   def getStorageStatus: Array[StorageStatus] = {
-    driverEndpoint.askWithReply[Array[StorageStatus]](GetStorageStatus)
+    driverEndpoint.askWithRetry[Array[StorageStatus]](GetStorageStatus)
   }
 
   /**
@@ -166,7 +166,7 @@ class BlockManagerMaster(
      * master endpoint for a response to a prior message.
      */
     val response = driverEndpoint.
-      askWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
+      askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
     val (blockManagerIds, futures) = response.unzip
     val result = Await.result(Future.sequence(futures), timeout)
     if (result == null) {
@@ -190,7 +190,7 @@ class BlockManagerMaster(
       filter: BlockId => Boolean,
       askSlaves: Boolean): Seq[BlockId] = {
     val msg = GetMatchingBlockIds(filter, askSlaves)
-    val future = driverEndpoint.askWithReply[Future[Seq[BlockId]]](msg)
+    val future = driverEndpoint.askWithRetry[Future[Seq[BlockId]]](msg)
     Await.result(future, timeout)
   }
 
@@ -205,7 +205,7 @@ class BlockManagerMaster(
 
   /** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
   private def tell(message: Any) {
-    if (!driverEndpoint.askWithReply[Boolean](message)) {
+    if (!driverEndpoint.askWithRetry[Boolean](message)) {
       throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 4682167..7212362 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -132,7 +132,7 @@ class BlockManagerMasterEndpoint(
     val removeMsg = RemoveRdd(rddId)
     Future.sequence(
       blockManagerInfo.values.map { bm =>
-        bm.slaveEndpoint.sendWithReply[Int](removeMsg)
+        bm.slaveEndpoint.ask[Int](removeMsg)
       }.toSeq
     )
   }
@@ -142,7 +142,7 @@ class BlockManagerMasterEndpoint(
     val removeMsg = RemoveShuffle(shuffleId)
     Future.sequence(
       blockManagerInfo.values.map { bm =>
-        bm.slaveEndpoint.sendWithReply[Boolean](removeMsg)
+        bm.slaveEndpoint.ask[Boolean](removeMsg)
       }.toSeq
     )
   }
@@ -159,7 +159,7 @@ class BlockManagerMasterEndpoint(
     }
     Future.sequence(
       requiredBlockManagers.map { bm =>
-        bm.slaveEndpoint.sendWithReply[Int](removeMsg)
+        bm.slaveEndpoint.ask[Int](removeMsg)
       }.toSeq
     )
   }
@@ -214,7 +214,7 @@ class BlockManagerMasterEndpoint(
           // Remove the block from the slave's BlockManager.
           // Doesn't actually wait for a confirmation and the message might get lost.
           // If message loss becomes frequent, we should add retry logic here.
-          blockManager.get.slaveEndpoint.sendWithReply[Boolean](RemoveBlock(blockId))
+          blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
         }
       }
     }
@@ -253,7 +253,7 @@ class BlockManagerMasterEndpoint(
     blockManagerInfo.values.map { info =>
       val blockStatusFuture =
         if (askSlaves) {
-          info.slaveEndpoint.sendWithReply[Option[BlockStatus]](getBlockStatus)
+          info.slaveEndpoint.ask[Option[BlockStatus]](getBlockStatus)
         } else {
           Future { info.getStatus(blockId) }
         }
@@ -277,7 +277,7 @@ class BlockManagerMasterEndpoint(
       blockManagerInfo.values.map { info =>
         val future =
           if (askSlaves) {
-            info.slaveEndpoint.sendWithReply[Seq[BlockId]](getMatchingBlockIds)
+            info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds)
           } else {
             Future { info.blocks.keys.filter(filter).toSeq }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 0fd570e..b789912 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -48,7 +48,7 @@ class HeartbeatReceiverSuite extends FunSuite with LocalSparkContext {
 
     val metrics = new TaskMetrics
     val blockManagerId = BlockManagerId("executor-1", "localhost", 12345)
-    val response = receiverRef.askWithReply[HeartbeatResponse](
+    val response = receiverRef.askWithRetry[HeartbeatResponse](
       Heartbeat("executor-1", Array(1L -> metrics), blockManagerId))
 
     verify(scheduler).executorHeartbeatReceived(
@@ -71,7 +71,7 @@ class HeartbeatReceiverSuite extends FunSuite with LocalSparkContext {
 
     val metrics = new TaskMetrics
     val blockManagerId = BlockManagerId("executor-1", "localhost", 12345)
-    val response = receiverRef.askWithReply[HeartbeatResponse](
+    val response = receiverRef.askWithRetry[HeartbeatResponse](
       Heartbeat("executor-1", Array(1L -> metrics), blockManagerId))
 
     verify(scheduler).executorHeartbeatReceived(

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 44c88b0..ae3339d 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -100,8 +100,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     }
     val rpcEndpointRef = env.setupEndpoint("send-ref", endpoint)
 
-    val newRpcEndpointRef = rpcEndpointRef.askWithReply[RpcEndpointRef]("Hello")
-    val reply = newRpcEndpointRef.askWithReply[String]("Echo")
+    val newRpcEndpointRef = rpcEndpointRef.askWithRetry[RpcEndpointRef]("Hello")
+    val reply = newRpcEndpointRef.askWithRetry[String]("Echo")
     assert("Echo" === reply)
   }
 
@@ -115,7 +115,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
         }
       }
     })
-    val reply = rpcEndpointRef.askWithReply[String]("hello")
+    val reply = rpcEndpointRef.askWithRetry[String]("hello")
     assert("hello" === reply)
   }
 
@@ -134,7 +134,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
     try {
-      val reply = rpcEndpointRef.askWithReply[String]("hello")
+      val reply = rpcEndpointRef.askWithRetry[String]("hello")
       assert("hello" === reply)
     } finally {
       anotherEnv.shutdown()
@@ -162,7 +162,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
     try {
       val e = intercept[Exception] {
-        rpcEndpointRef.askWithReply[String]("hello", 1 millis)
+        rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
       }
       assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
     } finally {
@@ -399,7 +399,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
       }
     })
 
-    val f = endpointRef.sendWithReply[String]("Hi")
+    val f = endpointRef.ask[String]("Hi")
     val ack = Await.result(f, 5 seconds)
     assert("ack" === ack)
 
@@ -419,7 +419,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
     try {
-      val f = rpcEndpointRef.sendWithReply[String]("hello")
+      val f = rpcEndpointRef.ask[String]("hello")
       val ack = Await.result(f, 5 seconds)
       assert("ack" === ack)
     } finally {
@@ -437,7 +437,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
       }
     })
 
-    val f = endpointRef.sendWithReply[String]("Hi")
+    val f = endpointRef.ask[String]("Hi")
     val e = intercept[SparkException] {
       Await.result(f, 5 seconds)
     }
@@ -460,7 +460,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef(
       "local", env.address, "sendWithReply-remotely-error")
     try {
-      val f = rpcEndpointRef.sendWithReply[String]("hello")
+      val f = rpcEndpointRef.ask[String]("hello")
       val e = intercept[SparkException] {
         Await.result(f, 5 seconds)
       }
@@ -529,7 +529,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef(
       "local", env.address, "sendWithReply-unserializable-error")
     try {
-      val f = rpcEndpointRef.sendWithReply[String]("hello")
+      val f = rpcEndpointRef.ask[String]("hello")
       intercept[TimeoutException] {
         Await.result(f, 1 seconds)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6957bc7..f5b410f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -356,7 +356,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    val reregister = !master.driverEndpoint.askWithReply[Boolean](
+    val reregister = !master.driverEndpoint.askWithRetry[Boolean](
       BlockManagerHeartbeat(store.blockManagerId))
     assert(reregister == true)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/687273d9/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 89af403..f237936 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
 
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
-    trackerEndpoint.askWithReply[Boolean](AddBlock(blockInfo))
+    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
     logDebug(s"Reported block $blockId")
   }
 
@@ -169,13 +169,13 @@ private[streaming] class ReceiverSupervisorImpl(
   override protected def onReceiverStart() {
     val msg = RegisterReceiver(
       streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
-    trackerEndpoint.askWithReply[Boolean](msg)
+    trackerEndpoint.askWithRetry[Boolean](msg)
   }
 
   override protected def onReceiverStop(message: String, error: Option[Throwable]) {
     logInfo("Deregistering receiver " + streamId)
     val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
-    trackerEndpoint.askWithReply[Boolean](DeregisterReceiver(streamId, message, errorString))
+    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
     logInfo("Stopped receiver " + streamId)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org