You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/08/14 22:11:26 UTC
[spark] branch master updated: [SPARK-32119][CORE] ExecutorPlugin
doesn't work with Standalone Cluster and Kubernetes with --jars
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1a4c8f7 [SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars
1a4c8f7 is described below
commit 1a4c8f718f748d8c2a39f0a3b209ce606bffe958
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Fri Aug 14 17:10:22 2020 -0500
[SPARK-32119][CORE] ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with --jars
### What changes were proposed in this pull request?
This PR changes Executor to load jars and files added by --jars and --files on Executor initialization.
To avoid downloading those jars/files twice, they are assosiated with `startTime` as their uploaded timestamp.
### Why are the changes needed?
ExecutorPlugin can't work with Standalone Cluster and Kubernetes
when a jar which contains plugins and files used by the plugins are added by --jars and --files option with spark-submit.
This is because jars and files added by --jars and --files are not loaded on Executor initialization.
I confirmed it works with YARN because jars/files are distributed as distributed cache.
### Does this PR introduce _any_ user-facing change?
Yes. jars/files added by --jars and --files are downloaded on each executor on initialization.
### How was this patch tested?
Added a new testcase.
Closes #28939 from sarutak/fix-plugin-issue.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../main/scala/org/apache/spark/SparkContext.scala | 27 +++++--
.../scala/org/apache/spark/executor/Executor.scala | 14 ++++
.../org/apache/spark/JavaSparkContextSuite.java | 7 +-
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 83 +++++++++++++++++++++-
4 files changed, 122 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5e0eaa4..85a24ac 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -384,6 +384,7 @@ class SparkContext(config: SparkConf) extends Logging {
try {
_conf = config.clone()
_conf.validateSettings()
+ _conf.set("spark.app.startTime", startTime.toString)
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
@@ -492,11 +493,17 @@ class SparkContext(config: SparkConf) extends Logging {
// Add each JAR given through the constructor
if (jars != null) {
- jars.foreach(addJar)
+ jars.foreach(jar => addJar(jar, true))
+ if (addedJars.nonEmpty) {
+ _conf.set("spark.app.initial.jar.urls", addedJars.keys.toSeq.mkString(","))
+ }
}
if (files != null) {
- files.foreach(addFile)
+ files.foreach(file => addFile(file, false, true))
+ if (addedFiles.nonEmpty) {
+ _conf.set("spark.app.initial.file.urls", addedFiles.keys.toSeq.mkString(","))
+ }
}
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
@@ -1500,7 +1507,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String): Unit = {
- addFile(path, false)
+ addFile(path, false, false)
}
/**
@@ -1522,6 +1529,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addFile(path: String, recursive: Boolean): Unit = {
+ addFile(path, recursive, false)
+ }
+
+ private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
@@ -1559,7 +1570,7 @@ class SparkContext(config: SparkConf) extends Logging {
path
}
}
- val timestamp = System.currentTimeMillis
+ val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
@@ -1569,7 +1580,7 @@ class SparkContext(config: SparkConf) extends Logging {
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
- "is not supported in the current version.")
+ "is not supported in the current version.")
}
}
@@ -1840,6 +1851,10 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String): Unit = {
+ addJar(path, false)
+ }
+
+ private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
@@ -1904,7 +1919,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (key != null) {
- val timestamp = System.currentTimeMillis
+ val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d220029..9a191e1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -220,6 +220,20 @@ private[spark] class Executor(
heartbeater.start()
+ private val appStartTime = conf.getLong("spark.app.startTime", 0)
+
+ // To allow users to distribute plugins and their required files
+ // specified by --jars and --files on application submission, those jars/files should be
+ // downloaded and added to the class loader via updateDependencies.
+ // This should be done before plugin initialization below
+ // because executors search plugins from the class loader and initialize them.
+ private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
+ conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
+ Map(urls.split(",").map(url => (url, appStartTime)): _*)
+ }.getOrElse(Map.empty)
+ }
+ updateDependencies(initialUserFiles, initialUserJars)
+
// Plugins need to load using a class loader that includes the executor's user classpath.
// Plugins also needs to be initialized after the heartbeater started
// to avoid blocking to send heartbeat (see SPARK-32175).
diff --git a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
index 0f489fb..b188ee1 100644
--- a/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaSparkContextSuite.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.apache.spark.api.java.*;
import org.apache.spark.*;
+import org.apache.spark.util.Utils;
/**
* Java apps can use both Java-friendly JavaSparkContext and Scala SparkContext.
@@ -35,14 +36,16 @@ import org.apache.spark.*;
public class JavaSparkContextSuite implements Serializable {
@Test
- public void javaSparkContext() {
+ public void javaSparkContext() throws IOException {
+ File tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark");
+ String dummyJarFile = File.createTempFile(tempDir.toString(), "jarFile").toString();
String[] jars = new String[] {};
java.util.Map<String, String> environment = new java.util.HashMap<>();
new JavaSparkContext(new SparkConf().setMaster("local").setAppName("name")).stop();
new JavaSparkContext("local", "name", new SparkConf()).stop();
new JavaSparkContext("local", "name").stop();
- new JavaSparkContext("local", "name", "sparkHome", "jarFile").stop();
+ new JavaSparkContext("local", "name", "sparkHome", dummyJarFile).stop();
new JavaSparkContext("local", "name", "sparkHome", jars).stop();
new JavaSparkContext("local", "name", "sparkHome", jars, environment).stop();
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 35311d3..b5b3751 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -570,7 +570,8 @@ class SparkSubmitSuite
}
}
- val clArgs2 = Seq("--class", "org.SomeClass", "thejar.jar")
+ val dummyJarFile = TestUtils.createJarWithClasses(Seq.empty)
+ val clArgs2 = Seq("--class", "org.SomeClass", dummyJarFile.toString)
val appArgs2 = new SparkSubmitArguments(clArgs2)
val (_, _, conf2, _) = submit.prepareSubmitEnvironment(appArgs2)
assert(!conf2.contains(UI_SHOW_CONSOLE_PROGRESS))
@@ -1220,6 +1221,86 @@ class SparkSubmitSuite
testRemoteResources(enableHttpFs = true, forceDownloadSchemes = Seq("*"))
}
+ test("SPARK-32119: Jars and files should be loaded when Executors launch for plugins") {
+ val tempDir = Utils.createTempDir()
+ val tempFileName = "test.txt"
+ val tempFile = new File(tempDir, tempFileName)
+
+ // scalastyle:off println
+ Utils.tryWithResource {
+ new PrintWriter(tempFile)
+ } { writer =>
+ writer.println("SparkPluginTest")
+ }
+ // scalastyle:on println
+
+ val sparkPluginCodeBody =
+ """
+ |@Override
+ |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
+ | return new TestExecutorPlugin();
+ |}
+ |
+ |@Override
+ |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
+ """.stripMargin
+ val executorPluginCodeBody =
+ s"""
+ |@Override
+ |public void init(
+ | org.apache.spark.api.plugin.PluginContext ctx,
+ | java.util.Map<String, String> extraConf) {
+ | String str = null;
+ | try (java.io.BufferedReader reader =
+ | new java.io.BufferedReader(new java.io.InputStreamReader(
+ | new java.io.FileInputStream("$tempFileName")))) {
+ | str = reader.readLine();
+ | } catch (java.io.IOException e) {
+ | throw new RuntimeException(e);
+ | } finally {
+ | assert str == "SparkPluginTest";
+ | }
+ |}
+ """.stripMargin
+
+ val compiledExecutorPlugin = TestUtils.createCompiledClass(
+ "TestExecutorPlugin",
+ tempDir,
+ "",
+ null,
+ Seq.empty,
+ Seq("org.apache.spark.api.plugin.ExecutorPlugin"),
+ executorPluginCodeBody)
+
+ val thisClassPath =
+ sys.props("java.class.path").split(File.pathSeparator).map(p => new File(p).toURI.toURL)
+ val compiledSparkPlugin = TestUtils.createCompiledClass(
+ "TestSparkPlugin",
+ tempDir,
+ "",
+ null,
+ Seq(tempDir.toURI.toURL) ++ thisClassPath,
+ Seq("org.apache.spark.api.plugin.SparkPlugin"),
+ sparkPluginCodeBody)
+
+ val jarUrl = TestUtils.createJar(
+ Seq(compiledSparkPlugin, compiledExecutorPlugin),
+ new File(tempDir, "testplugin.jar"))
+
+ val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+ val unusedFile = Files.createTempFile(tempDir.toPath, "unused", null)
+ val args = Seq(
+ "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+ "--name", "testApp",
+ "--master", "local-cluster[1,1,1024]",
+ "--conf", "spark.plugins=TestSparkPlugin",
+ "--conf", "spark.ui.enabled=false",
+ "--jars", jarUrl.toString + "," + unusedJar.toString,
+ "--files", tempFile.toString + "," + unusedFile.toString,
+ unusedJar.toString)
+ runSparkSubmit(args)
+ }
+
private def testRemoteResources(
enableHttpFs: Boolean,
forceDownloadSchemes: Seq[String] = Nil): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org