You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/06/19 14:59:51 UTC

[spark] branch master updated: [SPARK-31029] Avoid using global execution context in driver main thread for YarnSchedulerBackend

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c34e45  [SPARK-31029] Avoid using global execution context in driver main thread for YarnSchedulerBackend
3c34e45 is described below

commit 3c34e45df4b3e0610dde5334716025a85cbbc05b
Author: Shanyu Zhao <sh...@microsoft.com>
AuthorDate: Fri Jun 19 09:59:14 2020 -0500

    [SPARK-31029] Avoid using global execution context in driver main thread for YarnSchedulerBackend
    
    #31029 # What changes were proposed in this pull request?
    In YarnSchedulerBackend, we should avoid using the global execution context for its Future. Otherwise if user's Spark application also uses global execution context for its Future, the user is facing indeterministic behavior in terms of the thread's context class loader.
    
    ### Why are the changes needed?
    When running tpc-ds test (https://github.com/databricks/spark-sql-perf), occasionally we see error related to class not found:
    
    2020-02-04 20:00:26,673 ERROR yarn.ApplicationMaster: User class threw exception: scala.ScalaReflectionException: class com.databricks.spark.sql.perf.ExperimentRun in JavaMirror with
    sun.misc.Launcher$AppClassLoader28ba21f3 of type class sun.misc.Launcher$AppClassLoader with classpath [...]
    and parent being sun.misc.Launcher$ExtClassLoader3ff5d147 of type class sun.misc.Launcher$ExtClassLoader with classpath [...]
    and parent being primordial classloader with boot classpath [...] not found.
    
    This is the root cause for the problem:
    
    Spark driver starts ApplicationMaster in the main thread, which starts a user thread and set MutableURLClassLoader to that thread's ContextClassLoader.
      userClassThread = startUserApplication()
    
    The main thread then setup YarnSchedulerBackend RPC endpoints, which handles these calls using scala Future with the default global ExecutionContext:
      doRequestTotalExecutors
      doKillExecutors
    
    So for the main thread and user thread, whoever starts the future first get a chance to set ContextClassLoader to the default thread pool:
    
    - If main thread starts a future to handle doKillExecutors() before user thread does then the default thread pool thread's ContextClassLoader would be the default (AppClassLoader).
    - If user thread starts a future first then the thread pool thread will have MutableURLClassLoader.
    
    Note that only MutableURLClassLoader can load user provided class for the Spark app, you will see errors related to class not found if the ContextClassLoader is AppClassLoader.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests and manual tests
    
    Closes #27843 from shanyu/shanyu-31029.
    
    Authored-by: Shanyu Zhao <sh...@microsoft.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/scheduler/cluster/YarnSchedulerBackend.scala | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e428bab..0475b0a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -21,8 +21,7 @@ import java.util.EnumSet
 import java.util.concurrent.atomic.{AtomicBoolean}
 import javax.servlet.DispatcherType
 
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
@@ -67,6 +66,14 @@ private[spark] abstract class YarnSchedulerBackend(
 
   private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
+  /**
+   * Declare implicit single thread execution context for futures doRequestTotalExecutors and
+   * doKillExecutors below, avoiding using the global execution context that may cause conflict
+   * with user code's execution of futures.
+   */
+  private implicit val schedulerEndpointEC = ExecutionContext.fromExecutorService(
+      ThreadUtils.newDaemonSingleThreadExecutor("yarn-scheduler-endpoint"))
+
   /** Application ID. */
   protected var appId: Option[ApplicationId] = None
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org