You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/09/17 17:23:49 UTC

[spark] branch branch-3.0 updated: [SPARK-32738][CORE][3.0] Should reduce the number of active threads if fatal error happens in `Inbox.process`

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 17a5195  [SPARK-32738][CORE][3.0] Should reduce the number of active threads if fatal error happens in `Inbox.process`
17a5195 is described below

commit 17a5195dead75a4dffd150f3f69fa4983e05561b
Author: Zhenhua Wang <wz...@163.com>
AuthorDate: Thu Sep 17 12:22:35 2020 -0500

    [SPARK-32738][CORE][3.0] Should reduce the number of active threads if fatal error happens in `Inbox.process`
    
    This is a backport for [pr#29580](https://github.com/apache/spark/pull/29580) to branch 3.0.
    
    ### What changes were proposed in this pull request?
    
    Processing for `ThreadSafeRpcEndpoint` is controlled by  `numActiveThreads` in `Inbox`. Now if any fatal error happens during `Inbox.process`, `numActiveThreads` is not reduced. Then other threads can not process messages in that inbox, which causes the endpoint to "hang". For other type of endpoints, we also should keep  `numActiveThreads` correct.
    
    This problem is more serious in previous Spark 2.x versions since the driver, executor and block manager endpoints are all thread safe endpoints.
    
    To fix this, we should reduce the number of active threads if fatal error happens in `Inbox.process`.
    
    ### Why are the changes needed?
    
    `numActiveThreads` is not correct when fatal error happens and will cause the described problem.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add a new test.
    
    Closes #29763 from wzhfy/deal_with_fatal_error_3.0.
    
    Authored-by: Zhenhua Wang <wz...@163.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/rpc/netty/Inbox.scala     | 20 ++++++++++++++++++++
 .../org/apache/spark/rpc/netty/InboxSuite.scala      | 13 +++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
index 2ed03f7..472401b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
@@ -200,6 +200,16 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
    * Calls action closure, and calls the endpoint's onError function in the case of exceptions.
    */
   private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
+    def dealWithFatalError(fatal: Throwable): Unit = {
+      inbox.synchronized {
+        assert(numActiveThreads > 0, "The number of active threads should be positive.")
+        // Should reduce the number of active threads before throw the error.
+        numActiveThreads -= 1
+      }
+      logError(s"An error happened while processing message in the inbox for $endpointName", fatal)
+      throw fatal
+    }
+
     try action catch {
       case NonFatal(e) =>
         try endpoint.onError(e) catch {
@@ -209,8 +219,18 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
             } else {
               logError("Ignoring error", ee)
             }
+          case fatal: Throwable =>
+            dealWithFatalError(fatal)
         }
+      case fatal: Throwable =>
+        dealWithFatalError(fatal)
     }
   }
 
+  // exposed only for testing
+  def getNumActiveThreads: Int = {
+    inbox.synchronized {
+      inbox.numActiveThreads
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
index c74c728..8b1c602 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
@@ -136,4 +136,17 @@ class InboxSuite extends SparkFunSuite {
 
     endpoint.verifySingleOnNetworkErrorMessage(cause, remoteAddress)
   }
+
+  test("SPARK-32738: should reduce the number of active threads when fatal error happens") {
+    val endpoint = mock(classOf[TestRpcEndpoint])
+    when(endpoint.receive).thenThrow(new OutOfMemoryError())
+
+    val dispatcher = mock(classOf[Dispatcher])
+    val inbox = new Inbox("name", endpoint)
+    inbox.post(OneWayMessage(null, "hi"))
+    intercept[OutOfMemoryError] {
+      inbox.process(dispatcher)
+    }
+    assert(inbox.getNumActiveThreads == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org