You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/05/11 15:51:21 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5337] Spark scope
mode is broken
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new e446d47 [ZEPPELIN-5337] Spark scope mode is broken
e446d47 is described below
commit e446d4721825f547354d882d2c734e823f510c42
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 3 00:00:06 2021 +0800
[ZEPPELIN-5337] Spark scope mode is broken
### What is this PR for?
Spark scope mode is broken because we use different scala shell output dir for each spark scala shell. This PR fix this issue by using a shared output folder for all the scala shell instances.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5337
### How should this be tested?
* UT is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4105 from zjffdu/ZEPPELIN-5337 and squashes the following commits:
1cf75223c [Jeff Zhang] [ZEPPELIN-5337] Spark scope mode is broken
(cherry picked from commit f3b16ad5e3a60e1fcc682df504ba72f712995622)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../apache/zeppelin/spark/SparkInterpreter.java | 19 ++++++-
.../zeppelin/spark/SparkScala210Interpreter.scala | 7 ++-
.../zeppelin/spark/SparkScala211Interpreter.scala | 9 ++--
.../zeppelin/spark/SparkScala212Interpreter.scala | 8 ++-
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 2 -
.../zeppelin/integration/SparkIntegrationTest.java | 59 +++++++++++++++++++---
6 files changed, 79 insertions(+), 25 deletions(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 5e10e14..7b1460a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -33,8 +33,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -49,6 +52,18 @@ import java.util.concurrent.atomic.AtomicInteger;
public class SparkInterpreter extends AbstractInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class);
+ private static File scalaShellOutputDir;
+
+ static {
+ try {
+ // scala shell output will be shared between multiple spark scala shell, so use static field
+ scalaShellOutputDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "spark")
+ .toFile();
+ scalaShellOutputDir.deleteOnExit();
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to create scala shell output dir", e);
+ }
+ }
private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
private AbstractSparkScalaInterpreter innerInterpreter;
@@ -158,8 +173,8 @@ public class SparkInterpreter extends AbstractInterpreter {
String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
return (AbstractSparkScalaInterpreter)
- clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class)
- .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader);
+ clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
+ .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir);
}
@Override
diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index c093636..34d69c0 100644
--- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala210Interpreter(override val conf: SparkConf,
override val depFiles: java.util.List[String],
override val properties: Properties,
override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader)
+ override val sparkInterpreterClassLoader: URLClassLoader,
+ val outputDir: File)
extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
@@ -57,9 +58,7 @@ class SparkScala210Interpreter(override val conf: SparkConf,
if (InterpreterContext.get() != null) {
interpreterOutput.setInterpreterOutput(InterpreterContext.get().out)
}
- val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
- this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
- outputDir.deleteOnExit()
+
LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
// Only Spark1 requires to create http server, Spark2 removes HttpServer class.
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 3c3943b..6f531d2 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala211Interpreter(override val conf: SparkConf,
override val depFiles: java.util.List[String],
override val properties: Properties,
override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader)
+ override val sparkInterpreterClassLoader: URLClassLoader,
+ val outputDir: File)
extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
import SparkScala211Interpreter._
@@ -56,12 +57,10 @@ class SparkScala211Interpreter(override val conf: SparkConf,
if (sparkMaster == "yarn-client") {
System.setProperty("SPARK_YARN_MODE", "true")
}
- // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
- val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
- this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
+
LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- outputDir.deleteOnExit()
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+ // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
startHttpServer(outputDir).foreach { case (server, uri) =>
sparkHttpServer = server
conf.set("spark.repl.class.uri", uri)
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index e467336..e9c127d 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
override val depFiles: java.util.List[String],
override val properties: Properties,
override val interpreterGroup: InterpreterGroup,
- override val sparkInterpreterClassLoader: URLClassLoader)
+ override val sparkInterpreterClassLoader: URLClassLoader,
+ val outputDir: File)
extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
@@ -54,11 +55,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
if (sparkMaster == "yarn-client") {
System.setProperty("SPARK_YARN_MODE", "true")
}
- // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
- val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
- this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
+
LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
- outputDir.deleteOnExit()
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
val settings = new Settings()
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index eb99040..df3ca6d 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -63,8 +63,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
protected var sparkSession: Object = _
- protected var outputDir: File = _
-
protected var userJars: Seq[String] = _
protected var sparkHttpServer: Object = _
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 890d5a3..5fefa6b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
+import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
@@ -47,6 +48,7 @@ import java.io.IOException;
import java.util.EnumSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@@ -253,14 +255,57 @@ public abstract class SparkIntegrationTest {
@Test
public void testSparkSubmit() throws InterpreterException {
- InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit");
- sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
- // test SparkSubmitInterpreter
- InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
- Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
- InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+ try {
+ InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit");
+ sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+ // test SparkSubmitInterpreter
+ InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+ Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
+ InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+
+ assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ } finally {
+ interpreterSettingManager.close();
+ }
+ }
- assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ @Test
+ public void testScopedMode() throws InterpreterException {
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ try {
+ sparkInterpreterSetting.setProperty("spark.master", "local[*]");
+ sparkInterpreterSetting.setProperty("spark.submit.deployMode", "client");
+ sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+ sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+ sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+ sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SCOPED);
+
+
+ Interpreter sparkInterpreter1 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note1", "test"));
+
+ InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+ InterpreterResult interpreterResult = sparkInterpreter1.interpret("sc.range(1,10).map(e=>e+1).sum()", context);
+ assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54"));
+
+ Interpreter sparkInterpreter2 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note2", "test"));
+ assertNotEquals(sparkInterpreter1, sparkInterpreter2);
+
+ context = new InterpreterContext.Builder().setNoteId("note2").setParagraphId("paragraph_1").build();
+ interpreterResult = sparkInterpreter2.interpret("sc.range(1,10).map(e=>e+1).sum()", context);
+ assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+ assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54"));
+ } finally {
+ interpreterSettingManager.close();
+
+ if (sparkInterpreterSetting != null) {
+ // reset InterpreterOption so that it won't affect other tests.
+ sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SHARED);
+ }
+ }
}
private boolean isSpark2() {