You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/05/11 15:51:21 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5337] Spark scope mode is broken

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new e446d47  [ZEPPELIN-5337] Spark scope mode is broken
e446d47 is described below

commit e446d4721825f547354d882d2c734e823f510c42
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon May 3 00:00:06 2021 +0800

    [ZEPPELIN-5337] Spark scope mode is broken
    
    ### What is this PR for?
    
    Spark scope mode is broken because we use different scala shell output dir for each spark scala shell. This PR fix this issue by using a shared output folder for all the scala shell instances.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5337
    
    ### How should this be tested?
    * UT is added
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4105 from zjffdu/ZEPPELIN-5337 and squashes the following commits:
    
    1cf75223c [Jeff Zhang] [ZEPPELIN-5337] Spark scope mode is broken
    
    (cherry picked from commit f3b16ad5e3a60e1fcc682df504ba72f712995622)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/spark/SparkInterpreter.java    | 19 ++++++-
 .../zeppelin/spark/SparkScala210Interpreter.scala  |  7 ++-
 .../zeppelin/spark/SparkScala211Interpreter.scala  |  9 ++--
 .../zeppelin/spark/SparkScala212Interpreter.scala  |  8 ++-
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala |  2 -
 .../zeppelin/integration/SparkIntegrationTest.java | 59 +++++++++++++++++++---
 6 files changed, 79 insertions(+), 25 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 5e10e14..7b1460a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -33,8 +33,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,6 +52,18 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class SparkInterpreter extends AbstractInterpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class);
+  private static File scalaShellOutputDir;
+
+  static {
+    try {
+      // scala shell output will be shared between multiple spark scala shell, so use static field
+      scalaShellOutputDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "spark")
+              .toFile();
+      scalaShellOutputDir.deleteOnExit();
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to create scala shell output dir", e);
+    }
+  }
 
   private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
   private AbstractSparkScalaInterpreter innerInterpreter;
@@ -158,8 +173,8 @@ public class SparkInterpreter extends AbstractInterpreter {
     String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
     Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
     return (AbstractSparkScalaInterpreter)
-            clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class)
-                    .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader);
+            clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class, File.class)
+                    .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader, scalaShellOutputDir);
   }
 
   @Override
diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index c093636..34d69c0 100644
--- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala210Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
                                override val properties: Properties,
                                override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader)
+                               override val sparkInterpreterClassLoader: URLClassLoader,
+                               val outputDir: File)
   extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
@@ -57,9 +58,7 @@ class SparkScala210Interpreter(override val conf: SparkConf,
     if (InterpreterContext.get() != null) {
       interpreterOutput.setInterpreterOutput(InterpreterContext.get().out)
     }
-    val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
-    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
-    outputDir.deleteOnExit()
+
     LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
     // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 3c3943b..6f531d2 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala211Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
                                override val properties: Properties,
                                override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader)
+                               override val sparkInterpreterClassLoader: URLClassLoader,
+                               val outputDir: File)
   extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   import SparkScala211Interpreter._
@@ -56,12 +57,10 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     if (sparkMaster == "yarn-client") {
       System.setProperty("SPARK_YARN_MODE", "true")
     }
-    // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
-    val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
-    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
+
     LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
-    outputDir.deleteOnExit()
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+    // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
     startHttpServer(outputDir).foreach { case (server, uri) =>
       sparkHttpServer = server
       conf.set("spark.repl.class.uri", uri)
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index e467336..e9c127d 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -40,7 +40,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
                                override val properties: Properties,
                                override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader)
+                               override val sparkInterpreterClassLoader: URLClassLoader,
+                               val outputDir: File)
   extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
@@ -54,11 +55,8 @@ class SparkScala212Interpreter(override val conf: SparkConf,
     if (sparkMaster == "yarn-client") {
       System.setProperty("SPARK_YARN_MODE", "true")
     }
-    // Only Spark1 requires to create http server, Spark2 removes HttpServer class.
-    val rootDir = conf.get("spark.repl.classdir", System.getProperty("java.io.tmpdir"))
-    this.outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
+
     LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
-    outputDir.deleteOnExit()
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
 
     val settings = new Settings()
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index eb99040..df3ca6d 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -63,8 +63,6 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected var sparkSession: Object = _
 
-  protected var outputDir: File = _
-
   protected var userJars: Seq[String] = _
 
   protected var sparkHttpServer: Object = _
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 890d5a3..5fefa6b 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
+import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
@@ -47,6 +48,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -253,14 +255,57 @@ public abstract class SparkIntegrationTest {
 
   @Test
   public void testSparkSubmit() throws InterpreterException {
-    InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit");
-    sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
-    // test SparkSubmitInterpreter
-    InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
-    Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
-    InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+    try {
+      InterpreterSetting sparkSubmitInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark-submit");
+      sparkSubmitInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+      // test SparkSubmitInterpreter
+      InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+      Interpreter sparkSubmitInterpreter = interpreterFactory.getInterpreter("spark-submit", new ExecutionContext("user1", "note1", "test"));
+      InterpreterResult interpreterResult = sparkSubmitInterpreter.interpret("--class org.apache.spark.examples.SparkPi " + sparkHome + "/examples/jars/spark-examples*.jar ", context);
+
+      assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    } finally {
+      interpreterSettingManager.close();
+    }
+  }
 
-    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+  @Test
+  public void testScopedMode() throws InterpreterException {
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    try {
+      sparkInterpreterSetting.setProperty("spark.master", "local[*]");
+      sparkInterpreterSetting.setProperty("spark.submit.deployMode", "client");
+      sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
+      sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+      sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+      sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+      sparkInterpreterSetting.setProperty("zeppelin.spark.scala.color", "false");
+      sparkInterpreterSetting.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+      sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SCOPED);
+
+
+      Interpreter sparkInterpreter1 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note1", "test"));
+
+      InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+      InterpreterResult interpreterResult = sparkInterpreter1.interpret("sc.range(1,10).map(e=>e+1).sum()", context);
+      assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+      assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54"));
+
+      Interpreter sparkInterpreter2 = interpreterFactory.getInterpreter("spark.spark", new ExecutionContext("user1", "note2", "test"));
+      assertNotEquals(sparkInterpreter1, sparkInterpreter2);
+
+      context = new InterpreterContext.Builder().setNoteId("note2").setParagraphId("paragraph_1").build();
+      interpreterResult = sparkInterpreter2.interpret("sc.range(1,10).map(e=>e+1).sum()", context);
+      assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+      assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("54"));
+    } finally {
+      interpreterSettingManager.close();
+
+      if (sparkInterpreterSetting != null) {
+        // reset InterpreterOption so that it won't affect other tests.
+        sparkInterpreterSetting.getOption().setPerNote(InterpreterOption.SHARED);
+      }
+    }
   }
 
   private boolean isSpark2() {