You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/01/27 02:24:43 UTC

spark git commit: [SPARK-12614][CORE] Don't throw non fatal exception from ask

Repository: spark
Updated Branches:
  refs/heads/master eb917291c -> 22662b241


[SPARK-12614][CORE] Don't throw non fatal exception from ask

Right now RpcEndpointRef.ask may throw exception in some corner cases, such as calling ask after stopping RpcEnv. It's better to avoid throwing exception from RpcEndpointRef.ask. We can send the exception to the future for `ask`.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10568 from zsxwing/send-ask-fail.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22662b24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22662b24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22662b24

Branch: refs/heads/master
Commit: 22662b241629b56205719ede2f801a476e10a3cd
Parents: eb91729
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Jan 26 17:24:40 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Jan 26 17:24:40 2016 -0800

----------------------------------------------------------------------
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    | 54 +++++++++++---------
 1 file changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22662b24/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index ef876b1..9ae74d9 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -211,33 +211,37 @@ private[netty] class NettyRpcEnv(
         }
     }
 
-    if (remoteAddr == address) {
-      val p = Promise[Any]()
-      p.future.onComplete {
-        case Success(response) => onSuccess(response)
-        case Failure(e) => onFailure(e)
-      }(ThreadUtils.sameThread)
-      dispatcher.postLocalMessage(message, p)
-    } else {
-      val rpcMessage = RpcOutboxMessage(serialize(message),
-        onFailure,
-        (client, response) => onSuccess(deserialize[Any](client, response)))
-      postToOutbox(message.receiver, rpcMessage)
-      promise.future.onFailure {
-        case _: TimeoutException => rpcMessage.onTimeout()
-        case _ =>
+    try {
+      if (remoteAddr == address) {
+        val p = Promise[Any]()
+        p.future.onComplete {
+          case Success(response) => onSuccess(response)
+          case Failure(e) => onFailure(e)
+        }(ThreadUtils.sameThread)
+        dispatcher.postLocalMessage(message, p)
+      } else {
+        val rpcMessage = RpcOutboxMessage(serialize(message),
+          onFailure,
+          (client, response) => onSuccess(deserialize[Any](client, response)))
+        postToOutbox(message.receiver, rpcMessage)
+        promise.future.onFailure {
+          case _: TimeoutException => rpcMessage.onTimeout()
+          case _ =>
+        }(ThreadUtils.sameThread)
+      }
+
+      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
+        override def run(): Unit = {
+          onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
+        }
+      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
+      promise.future.onComplete { v =>
+        timeoutCancelable.cancel(true)
       }(ThreadUtils.sameThread)
+    } catch {
+      case NonFatal(e) =>
+        onFailure(e)
     }
-
-    val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
-      override def run(): Unit = {
-        promise.tryFailure(
-          new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
-      }
-    }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
-    promise.future.onComplete { v =>
-      timeoutCancelable.cancel(true)
-    }(ThreadUtils.sameThread)
     promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org