You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/05/22 15:38:43 UTC

[spark] branch branch-2.3 updated: [SPARK-25139][SPARK-18406][CORE][BRANCH-2.3] Avoid NonFatals to kill the Executor in PythonRunner

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new e56a9b1  [SPARK-25139][SPARK-18406][CORE][BRANCH-2.3] Avoid NonFatals to kill the Executor in PythonRunner
e56a9b1 is described below

commit e56a9b19685b0ba089a0d6979d74f6592b9029d1
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Wed May 22 08:38:19 2019 -0700

    [SPARK-25139][SPARK-18406][CORE][BRANCH-2.3] Avoid NonFatals to kill the Executor in PythonRunner
    
    ## What changes were proposed in this pull request?
    
    Python uses a prefetch approach to read the result from upstream and serve them in another thread, thus it's possible that if the children operator doesn't consume all the data then the Task cleanup may happen before Python side read process finishes, this in turn create a race condition that the block read locks are freed during Task cleanup and then the reader try to release the read lock it holds and find it has been released, in this case we shall hit a AssertionError.
    
    We shall catch the AssertionError in PythonRunner and prevent this kill the Executor.
    
    ## How was this patch tested?
    
    Hard to write a unit test case for this case, manually verified with failed job.
    
    Closes #24670 from rezasafi/branch-2.3.
    
    Authored-by: Xingbo Jiang <xi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/api/python/PythonRDD.scala    |  2 +-
 .../org/apache/spark/api/python/PythonRunner.scala | 34 ++++++++++++----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 146f754..b3f33fb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -88,7 +88,7 @@ private[spark] case class PythonFunction(
 private[spark] case class ChainedPythonFunctions(funcs: Seq[PythonFunction])
 
 /** Thrown for exceptions in user Python code. */
-private[spark] class PythonException(msg: String, cause: Exception)
+private[spark] class PythonException(msg: String, cause: Throwable)
   extends RuntimeException(msg, cause)
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f7d1461..d88757b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
@@ -143,15 +144,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       context: TaskContext)
     extends Thread(s"stdout writer for $pythonExec") {
 
-    @volatile private var _exception: Exception = null
+    @volatile private var _exception: Throwable = null
 
     private val pythonIncludes = funcs.flatMap(_.funcs.flatMap(_.pythonIncludes.asScala)).toSet
     private val broadcastVars = funcs.flatMap(_.funcs.flatMap(_.broadcastVars.asScala))
 
     setDaemon(true)
 
-    /** Contains the exception thrown while writing the parent iterator to the Python process. */
-    def exception: Option[Exception] = Option(_exception)
+    /** Contains the throwable thrown while writing the parent iterator to the Python process. */
+    def exception: Option[Throwable] = Option(_exception)
 
     /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
     def shutdownOnTaskCompletion() {
@@ -251,18 +252,21 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
         dataOut.writeInt(SpecialLengths.END_OF_STREAM)
         dataOut.flush()
       } catch {
-        case e: Exception if context.isCompleted || context.isInterrupted =>
-          logDebug("Exception thrown after task completion (likely due to cleanup)", e)
-          if (!worker.isClosed) {
-            Utils.tryLog(worker.shutdownOutput())
-          }
-
-        case e: Exception =>
-          // We must avoid throwing exceptions here, because the thread uncaught exception handler
-          // will kill the whole executor (see org.apache.spark.executor.Executor).
-          _exception = e
-          if (!worker.isClosed) {
-            Utils.tryLog(worker.shutdownOutput())
+        case t: Throwable if (NonFatal(t) || t.isInstanceOf[Exception]) =>
+          if (context.isCompleted || context.isInterrupted) {
+            logDebug("Exception/NonFatal Error thrown after task completion (likely due to " +
+              "cleanup)", t)
+            if (!worker.isClosed) {
+              Utils.tryLog(worker.shutdownOutput())
+            }
+          } else {
+            // We must avoid throwing exceptions/NonFatals here, because the thread uncaught
+            // exception handler will kill the whole executor (see
+            // org.apache.spark.executor.Executor).
+            _exception = t
+            if (!worker.isClosed) {
+              Utils.tryLog(worker.shutdownOutput())
+            }
           }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org