You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/09/21 21:35:38 UTC

[spark] branch branch-2.4 updated: [SPARK-19147][CORE] Gracefully handle error in task after executor is stopped

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 1b939ea  [SPARK-19147][CORE] Gracefully handle error in task after executor is stopped
1b939ea is described below

commit 1b939ea0ce730dd170f4376d27d6b2e87095cef6
Author: colinma <co...@tencent.com>
AuthorDate: Sat Sep 21 07:31:39 2019 -0500

    [SPARK-19147][CORE] Gracefully handle error in task after executor is stopped
    
    ### What changes were proposed in this pull request?
    
    TransportClientFactory.createClient() is called by task and TransportClientFactory.close() is called by executor.
    When stop the executor, close() will set workerGroup = null, NPE will occur in createClient which generate many exception in log.
    For exception occurs after close(), treated it as an expected Exception
    and transform it to InterruptedException which can be processed by Executor.
    
    ### Why are the changes needed?
    
    The change can reduce the exception stack trace in log file, and user won't be confused by these excepted exception.
    
    ### Does this PR introduce any user-facing change?
    
    N/A
    
    ### How was this patch tested?
    
    New tests are added in TransportClientFactorySuite and ExecutorSuite
    
    Closes #25759 from colinmjj/spark-19147.
    
    Authored-by: colinma <co...@tencent.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
    (cherry picked from commit 076186e8815ba6381f14941e05a6ec15348369ba)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/network/client/TransportClientFactory.java    | 3 +--
 .../java/org/apache/spark/network/TransportClientFactorySuite.java | 7 +++++++
 core/src/main/scala/org/apache/spark/SparkEnv.scala                | 2 +-
 core/src/main/scala/org/apache/spark/executor/Executor.scala       | 5 +++++
 4 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 16d242d..7aa5944 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -285,9 +285,8 @@ public class TransportClientFactory implements Closeable {
     }
     connectionPool.clear();
 
-    if (workerGroup != null) {
+    if (workerGroup != null && !workerGroup.isShuttingDown()) {
       workerGroup.shutdownGracefully();
-      workerGroup = null;
     }
   }
 }
diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index 86482f1..28cfc32 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -215,4 +215,11 @@ public class TransportClientFactorySuite {
       assertFalse(c1.isActive());
     }
   }
+
+  @Test(expected = IOException.class)
+  public void closeFactoryBeforeCreateClient() throws IOException, InterruptedException {
+    TransportClientFactory factory = context.createClientFactory();
+    factory.close();
+    factory.createClient(TestUtils.getLocalHost(), server1.getPort());
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 72123f2..32d4f7f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -70,7 +70,7 @@ class SparkEnv (
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
-  private[spark] var isStopped = false
+  @volatile private[spark] var isStopped = false
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
   // A general, soft-reference map for metadata needed during HadoopRDD split computation
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3348067..f7ff0b8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -578,6 +578,11 @@ private[spark] class Executor(
           setTaskFinishedAndClearInterruptStatus()
           execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason))
 
+        case t: Throwable if env.isStopped =>
+          // Log the expected exception after executor.stop without stack traces
+          // see: SPARK-19147
+          logError(s"Exception in $taskName (TID $taskId): ${t.getMessage}")
+
         case t: Throwable =>
           // Attempt to exit cleanly by informing the driver of our failure.
           // If anything goes wrong (or this was a fatal exception), we will delegate to


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org