You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/22 00:09:56 UTC

[spark] branch branch-2.3 updated: [SPARK-27496][CORE] Fatal errors should also be sent back to the sender

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 257abc4  [SPARK-27496][CORE] Fatal errors should also be sent back to the sender
257abc4 is described below

commit 257abc476dae1a7af68d4e55db7c0afeea2bf831
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Sun Apr 21 17:00:07 2019 -0700

    [SPARK-27496][CORE] Fatal errors should also be sent back to the sender
    
    ## What changes were proposed in this pull request?
    
    When a fatal error (such as StackOverflowError) throws from "receiveAndReply", we should try our best to notify the sender. Otherwise, the sender will hang until timeout.
    
    In addition, when a MessageLoop is dying unexpectedly, it should resubmit a new one so that Dispatcher is still working.
    
    ## How was this patch tested?
    
    New unit tests.
    
    Closes #24396 from zsxwing/SPARK-27496.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 009059e3c261a73d605bc49aee4aecb0eb0e8267)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/rpc/netty/Dispatcher.scala    | 10 +++-
 .../scala/org/apache/spark/rpc/netty/Inbox.scala   |  2 +-
 .../apache/spark/rpc/netty/NettyRpcEnvSuite.scala  | 53 +++++++++++++++++++++-
 3 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 904c4d0..f261635 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -224,7 +224,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
           }
         }
       } catch {
-        case ie: InterruptedException => // exit
+        case _: InterruptedException => // exit
+        case t: Throwable =>
+          try {
+            // Re-submit a MessageLoop so that Dispatcher will still work if
+            // UncaughtExceptionHandler decides to not kill JVM.
+            threadpool.execute(new MessageLoop)
+          } finally {
+            throw t
+          }
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index d32eba6..44d2622 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -106,7 +106,7 @@ private[netty] class Inbox(
                 throw new SparkException(s"Unsupported message $message from ${_sender}")
               })
             } catch {
-              case NonFatal(e) =>
+              case e: Throwable =>
                 context.sendFailure(e)
                 // Throw the exception -- this exception will be caught by the safelyCall function.
                 // The endpoint's onError function will be called.
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index f9481f8..59b4b70 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -17,13 +17,20 @@
 
 package org.apache.spark.rpc.netty
 
+import java.util.concurrent.ExecutionException
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.network.client.TransportClient
 import org.apache.spark.rpc._
 
-class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
+class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar with TimeLimits {
+
+  private implicit val signaler: Signaler = ThreadSignaler
 
   override def createRpcEnv(
       conf: SparkConf,
@@ -84,4 +91,48 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
       msg3,
       RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
   }
+
+  test("StackOverflowError should be sent back and Dispatcher should survive") {
+    val numUsableCores = 2
+    val conf = new SparkConf
+    val config = RpcEnvConfig(
+      conf,
+      "test",
+      "localhost",
+      "localhost",
+      0,
+      new SecurityManager(conf),
+      numUsableCores,
+      clientMode = false)
+    val anotherEnv = new NettyRpcEnvFactory().create(config)
+    anotherEnv.setupEndpoint("StackOverflowError", new RpcEndpoint {
+      override val rpcEnv = anotherEnv
+
+      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+        // scalastyle:off throwerror
+        case msg: String => throw new StackOverflowError
+        // scalastyle:on throwerror
+        case num: Int => context.reply(num)
+      }
+    })
+
+    val rpcEndpointRef = env.setupEndpointRef(anotherEnv.address, "StackOverflowError")
+    try {
+      // Send `numUsableCores` messages to trigger `numUsableCores` `StackOverflowError`s
+      for (_ <- 0 until numUsableCores) {
+        val e = intercept[SparkException] {
+          rpcEndpointRef.askSync[String]("hello")
+        }
+        // The root cause `e.getCause.getCause` because it is boxed by Scala Promise.
+        assert(e.getCause.isInstanceOf[ExecutionException])
+        assert(e.getCause.getCause.isInstanceOf[StackOverflowError])
+      }
+      failAfter(10.seconds) {
+        assert(rpcEndpointRef.askSync[Int](100) === 100)
+      }
+    } finally {
+      anotherEnv.shutdown()
+      anotherEnv.awaitTermination()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org