You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/08/14 22:11:26 UTC

[spark] branch master updated: [SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a4c8f7  [SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars
1a4c8f7 is described below

commit 1a4c8f718f748d8c2a39f0a3b209ce606bffe958
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Fri Aug 14 17:10:22 2020 -0500

    [SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars
    
    ### What changes were proposed in this pull request?
    
    This PR changes Executor to load jars and files added by --jars and --files on Executor initialization.
    To avoid downloading those jars/files twice, they are assosiated with `startTime` as their uploaded timestamp.
    
    ### Why are the changes needed?
    
    ExecutorPlugin can't work with Standalone Cluster and Kubernetes
    when a jar which contains plugins and files used by the plugins are added by --jars and --files option with spark-submit.
    
    This is because jars and files added by --jars and --files are not loaded on Executor initialization.
    I confirmed it works with YARN because jars/files are distributed as distributed cache.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. jars/files added by --jars and --files are downloaded on each executor on initialization.
    
    ### How was this patch tested?
    
    Added a new testcase.
    
    Closes #28939 from sarutak/fix-plugin-issue.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 27 +++++--
 .../scala/org/apache/spark/executor/Executor.scala | 14 ++++
 .../org/apache/spark/JavaSparkContextSuite.java    |  7 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 83 +++++++++++++++++++++-
 4 files changed, 122 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5e0eaa4..85a24ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -384,6 +384,7 @@ class SparkContext(config: SparkConf) extends Logging {
   try {
     _conf = config.clone()
     _conf.validateSettings()
+    _conf.set("spark.app.startTime", startTime.toString)
 
     if (!_conf.contains("spark.master")) {
       throw new SparkException("A master URL must be set in your configuration")
@@ -492,11 +493,17 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Add each JAR given through the constructor
     if (jars != null) {
-      jars.foreach(addJar)
+      jars.foreach(jar => addJar(jar, true))
+      if (addedJars.nonEmpty) {
+        _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(","))
+      }
     }
 
     if (files != null) {
-      files.foreach(addFile)
+      files.foreach(file => addFile(file, false, true))
+      if (addedFiles.nonEmpty) {
+        _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(","))
+      }
     }
 
     _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
@@ -1500,7 +1507,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same path are ignored.
    */
   def addFile(path: String): Unit = {
-    addFile(path, false)
+    addFile(path, false, false)
   }
 
   /**
@@ -1522,6 +1529,10 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same path are ignored.
    */
   def addFile(path: String, recursive: Boolean): Unit = {
+    addFile(path, recursive, false)
+  }
+
+  private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
     val uri = new Path(path).toUri
     val schemeCorrectedURI = uri.getScheme match {
       case null => new File(path).getCanonicalFile.toURI
@@ -1559,7 +1570,7 @@ class SparkContext(config: SparkConf) extends Logging {
           path
         }
     }
-    val timestamp = System.currentTimeMillis
+    val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
     if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver can still use the
@@ -1569,7 +1580,7 @@ class SparkContext(config: SparkConf) extends Logging {
       postEnvironmentUpdate()
     } else {
       logWarning(s"The path $path has been added already. Overwriting of added paths " +
-       "is not supported in the current version.")
+        "is not supported in the current version.")
     }
   }
 
@@ -1840,6 +1851,10 @@ class SparkContext(config: SparkConf) extends Logging {
    * @note A path can be added only once. Subsequent additions of the same path are ignored.
    */
   def addJar(path: String): Unit = {
+    addJar(path, false)
+  }
+
+  private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
     def addLocalJarFile(file: File): String = {
       try {
         if (!file.exists()) {
@@ -1904,7 +1919,7 @@ class SparkContext(config: SparkConf) extends Logging {
         }
       }
       if (key != null) {
-        val timestamp = System.currentTimeMillis
+        val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
         if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
           logInfo(s"Added JAR $path at $key with timestamp $timestamp")
           postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d220029..9a191e1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -220,6 +220,20 @@ private[spark] class Executor(
 
   heartbeater.start()
 
+  private val appStartTime = conf.getLong("spark.app.startTime", 0)
+
+  // To allow users to distribute plugins and their required files
+  // specified by --jars and --files on application submission, those jars/files should be
+  // downloaded and added to the class loader via updateDependencies.
+  // This should be done before plugin initialization below
+  // because executors search plugins from the class loader and initialize them.
+  private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
+    conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
+      Map(urls.split(",").map(url => (url, appStartTime)): _*)
+    }.getOrElse(Map.empty)
+  }
+  updateDependencies(initialUserFiles, initialUserJars)
+
   // Plugins need to load using a class loader that includes the executor's user classpath.
   // Plugins also needs to be initialized after the heartbeater started
   // to avoid blocking to send heartbeat (see SPARK-32175).
diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
index 0f489fb..b188ee1 100644
--- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.*;
 import org.apache.spark.*;
+import org.apache.spark.util.Utils;
 
 /**
  * Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext.
@@ -35,14 +36,16 @@ import org.apache.spark.*;
 public class JavaSparkContextSuite implements Serializable {
 
   @Test
-  public void javaSparkContext() {
+  public void javaSparkContext() throws IOException {
+    File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
+    String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString();
     String[] jars = new String[] {};
     java.util.Map<String, String> environment = new java.util.HashMap<>();
 
     new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
     new JavaSparkContext("local", "name", new SparkConf()).stop();
     new JavaSparkContext("local", "name").stop();
-    new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
+    new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
     new JavaSparkContext("local", "name", "sparkHome", jars).stop();
     new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop();
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 35311d3..b5b3751 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -570,7 +570,8 @@ class SparkSubmitSuite
       }
     }
 
-    val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
+    val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
+    val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
     val appArgs2 = new SparkSubmitArguments(clArgs2)
     val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
     assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
@@ -1220,6 +1221,86 @@ class SparkSubmitSuite
     testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*"))
   }
 
+  test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
+    val tempDir = Utils.createTempDir()
+    val tempFileName = "test.txt"
+    val tempFile = new File(tempDir, tempFileName)
+
+    // scalastyle:off println
+    Utils.tryWithResource {
+      new PrintWriter(tempFile)
+    } { writer =>
+      writer.println("SparkPluginTest")
+    }
+    // scalastyle:on println
+
+    val sparkPluginCodeBody =
+      """
+        |@Override
+        |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
+        |  return new TestExecutorPlugin();
+        |}
+        |
+        |@Override
+        |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
+      """.stripMargin
+    val executorPluginCodeBody =
+      s"""
+        |@Override
+        |public void init(
+        |    org.apache.spark.api.plugin.PluginContext ctx,
+        |    java.util.Map<String, String> extraConf) {
+        |  String str = null;
+        |  try (java.io.BufferedReader reader =
+        |    new java.io.BufferedReader(new java.io.InputStreamReader(
+        |      new java.io.FileInputStream("$tempFileName")))) {
+        |    str = reader.readLine();
+        |  } catch (java.io.IOException e) {
+        |    throw new RuntimeException(e);
+        |  } finally {
+        |    assert str == "SparkPluginTest";
+        |  }
+        |}
+      """.stripMargin
+
+    val compiledExecutorPlugin = TestUtils.createCompiledClass(
+      "TestExecutorPlugin",
+      tempDir,
+      "",
+      null,
+      Seq.empty,
+      Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
+      executorPluginCodeBody)
+
+    val thisClassPath =
+      sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
+    val compiledSparkPlugin = TestUtils.createCompiledClass(
+      "TestSparkPlugin",
+      tempDir,
+      "",
+      null,
+      Seq(tempDir.toURI.toURL) ++ thisClassPath,
+      Seq("org.apache.spark.api.plugin.SparkPlugin"),
+      sparkPluginCodeBody)
+
+    val jarUrl = TestUtils.createJar(
+      Seq(compiledSparkPlugin, compiledExecutorPlugin),
+      new File(tempDir, "testplugin.jar"))
+
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null)
+    val args = Seq(
+      "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local-cluster[1,1,1024]",
+      "--conf", "spark.plugins=TestSparkPlugin",
+      "--conf", "spark.ui.enabled=false",
+      "--jars", jarUrl.toString + "," + unusedJar.toString,
+      "--files", tempFile.toString + "," + unusedFile.toString,
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+
   private def testRemoteResources(
       enableHttpFs: Boolean,
       forceDownloadSchemes: Seq[String] = Nil): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org