You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/22 00:09:56 UTC
[spark] branch branch-2.3 updated: [SPARK-27496][CORE] Fatal errors
should also be sent back to the sender
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 257abc4 [SPARK-27496][CORE] Fatal errors should also be sent back to the sender
257abc4 is described below
commit 257abc476dae1a7af68d4e55db7c0afeea2bf831
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Sun Apr 21 17:00:07 2019 -0700
[SPARK-27496][CORE] Fatal errors should also be sent back to the sender
## What changes were proposed in this pull request?
When a fatal error (such as StackOverflowError) throws from "receiveAndReply", we should try our best to notify the sender. Otherwise, the sender will hang until timeout.
In addition, when a MessageLoop is dying unexpectedly, it should resubmit a new one so that Dispatcher is still working.
## How was this patch tested?
New unit tests.
Closes #24396 from zsxwing/SPARK-27496.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 009059e3c261a73d605bc49aee4aecb0eb0e8267)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/rpc/netty/Dispatcher.scala | 10 +++-
.../scala/org/apache/spark/rpc/netty/Inbox.scala | 2 +-
.../apache/spark/rpc/netty/NettyRpcEnvSuite.scala | 53 +++++++++++++++++++++-
3 files changed, 62 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 904c4d0..f261635 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -224,7 +224,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
}
}
} catch {
- case ie: InterruptedException => // exit
+ case _: InterruptedException => // exit
+ case t: Throwable =>
+ try {
+ // Re-submit a MessageLoop so that Dispatcher will still work if
+ // UncaughtExceptionHandler decides to not kill JVM.
+ threadpool.execute(new MessageLoop)
+ } finally {
+ throw t
+ }
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index d32eba6..44d2622 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -106,7 +106,7 @@ private[netty] class Inbox(
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
- case NonFatal(e) =>
+ case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index f9481f8..59b4b70 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -17,13 +17,20 @@
package org.apache.spark.rpc.netty
+import java.util.concurrent.ExecutionException
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.mockito.MockitoSugar
import org.apache.spark._
import org.apache.spark.network.client.TransportClient
import org.apache.spark.rpc._
-class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
+class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar with TimeLimits {
+
+ private implicit val signaler: Signaler = ThreadSignaler
override def createRpcEnv(
conf: SparkConf,
@@ -84,4 +91,48 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
msg3,
RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
}
+
+ test("StackOverflowError should be sent back and Dispatcher should survive") {
+ val numUsableCores = 2
+ val conf = new SparkConf
+ val config = RpcEnvConfig(
+ conf,
+ "test",
+ "localhost",
+ "localhost",
+ 0,
+ new SecurityManager(conf),
+ numUsableCores,
+ clientMode = false)
+ val anotherEnv = new NettyRpcEnvFactory().create(config)
+ anotherEnv.setupEndpoint("StackOverflowError", new RpcEndpoint {
+ override val rpcEnv = anotherEnv
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ // scalastyle:off throwerror
+ case msg: String => throw new StackOverflowError
+ // scalastyle:on throwerror
+ case num: Int => context.reply(num)
+ }
+ })
+
+ val rpcEndpointRef = env.setupEndpointRef(anotherEnv.address, "StackOverflowError")
+ try {
+ // Send `numUsableCores` messages to trigger `numUsableCores` `StackOverflowError`s
+ for (_ <- 0 until numUsableCores) {
+ val e = intercept[SparkException] {
+ rpcEndpointRef.askSync[String]("hello")
+ }
+ // The root cause `e.getCause.getCause` because it is boxed by Scala Promise.
+ assert(e.getCause.isInstanceOf[ExecutionException])
+ assert(e.getCause.getCause.isInstanceOf[StackOverflowError])
+ }
+ failAfter(10.seconds) {
+ assert(rpcEndpointRef.askSync[Int](100) === 100)
+ }
+ } finally {
+ anotherEnv.shutdown()
+ anotherEnv.awaitTermination()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org