You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/04/15 02:07:34 UTC

[zeppelin] branch master updated: [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new f5040688c0 [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)
f5040688c0 is described below

commit f5040688c034e6c8350277c05fa7d1b4cc0f1b59
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Apr 15 10:07:26 2022 +0800

    [ZEPPELIN-5702] Refactor spark interpreter module structure (#4344)
---
 spark/README.md                                    |  28 ++
 .../spark/AbstractSparkScalaInterpreter.java       | 305 ++++++++++++++++--
 .../apache/zeppelin/spark/SparkInterpreter.java    |  26 +-
 .../apache/zeppelin/spark/SparkRInterpreter.java   |   5 -
 .../apache/zeppelin/spark/SparkSqlInterpreter.java |   9 +-
 .../main/java/org/apache/zeppelin/spark/Utils.java |  72 +----
 .../zeppelin/spark/SparkInterpreterTest.java       |  11 +-
 spark/scala-2.11/spark-scala-parent                |   1 -
 .../zeppelin/spark/SparkScala211Interpreter.scala  | 265 ++++++++++------
 .../zeppelin/spark/SparkZeppelinContext.scala      |   1 -
 spark/scala-2.12/spark-scala-parent                |   1 -
 .../zeppelin/spark/SparkScala212Interpreter.scala  | 234 +++++++++-----
 .../zeppelin/spark/SparkZeppelinContext.scala      |   1 -
 spark/scala-2.13/spark-scala-parent                |   1 -
 .../zeppelin/spark/SparkScala213Interpreter.scala  | 189 ++++++++----
 .../zeppelin/spark/SparkZeppelinContext.scala      |   1 -
 spark/spark-scala-parent/pom.xml                   |  59 ----
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 341 ---------------------
 .../apache/zeppelin/spark/JobProgressUtil.scala    |  49 ---
 .../java/org/apache/zeppelin/spark/SparkShims.java |   1 -
 .../integration/ZSessionIntegrationTest.java       |   4 +-
 21 files changed, 768 insertions(+), 836 deletions(-)

diff --git a/spark/README.md b/spark/README.md
new file mode 100644
index 0000000000..a9b039ec5e
--- /dev/null
+++ b/spark/README.md
@@ -0,0 +1,28 @@
+# Spark Interpreter
+
+Spark interpreter is the first and most important interpreter of Zeppelin. It supports multiple versions of Spark and multiple versions of Scala.
+
+
+# Module structure of Spark interpreter
+
+* interpreter     
+  - This module is the entry module of Spark interpreter. All the interpreters are defined here. SparkInterpreter is the most important one, 
+  SparkContext/SparkSession is created here, other interpreters (PySparkInterpreter,IPySparkInterpreter, SparkRInterpreter and etc) are all depends on SparkInterpreter.  
+  Due to incompatibility between Scala versions, there are several scala-x modules for each supported Scala version.
+  Due to incompatibility between Spark versions, there are several spark-shims modules for each supported Spark version.
+* spark-scala-parent   
+  - Parent module for each Scala module
+* scala-2.11   
+  - Scala module for Scala 2.11
+* scala-2.12
+  - Scala module for Scala 2.12
+* scala-2.13
+  - Scala module for Scala 2.13
+* spark-shims 
+  - Parent module for each Spark module
+* spark2-shims
+  - Shims module for Spark2
+* spark3-shims
+  - Shims module for Spark3
+
+
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
index 71acd5e467..bc58ea7461 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
@@ -17,17 +17,32 @@
 
 package org.apache.zeppelin.spark;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
 import org.apache.spark.sql.SQLContext;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.spark.sql.SparkSession;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.kotlin.KotlinInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 /**
  * This is bridge class which bridge the communication between java side and scala side.
@@ -35,44 +50,286 @@ import java.util.List;
  */
 public abstract class AbstractSparkScalaInterpreter {
 
-  public abstract SparkContext getSparkContext();
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSparkScalaInterpreter.class);
+  private static final AtomicInteger SESSION_NUM = new AtomicInteger(0);
 
-  public abstract SQLContext getSqlContext();
+  protected SparkConf conf;
+  protected SparkContext sc;
+  protected SparkSession sparkSession;
+  protected SQLContext sqlContext;
+  protected String sparkUrl;
+  protected ZeppelinContext z;
 
-  public abstract Object getSparkSession();
+  protected Properties properties;
+  protected List<String> depFiles;
 
-  public abstract String getSparkUrl();
+  public AbstractSparkScalaInterpreter(SparkConf conf,
+                                       Properties properties,
+                                       List<String> depFiles) {
+    this.conf = conf;
+    this.properties = properties;
+    this.depFiles = depFiles;
+  }
 
-  public abstract ZeppelinContext getZeppelinContext();
+  public SparkContext getSparkContext() {
+    return this.sc;
+  }
 
-  public int getProgress(InterpreterContext context) throws InterpreterException {
-    return getProgress(Utils.buildJobGroupId(context), context);
+  public SQLContext getSqlContext() {
+    return this.sqlContext;
   }
 
-  public abstract int getProgress(String jobGroup,
-                                  InterpreterContext context) throws InterpreterException;
+  public SparkSession getSparkSession() {
+    return this.sparkSession;
+  }
 
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
+  public String getSparkUrl() {
+    return this.sparkUrl;
   }
 
-  public Interpreter.FormType getFormType() throws InterpreterException {
-    return Interpreter.FormType.SIMPLE;
+  public ZeppelinContext getZeppelinContext() {
+    return this.z;
   }
 
-  public abstract void open();
+  public AbstractSparkScalaInterpreter() {
+  }
+
+  public void open() throws InterpreterException {
+    /* Required for scoped mode.
+     * In scoped mode multiple scala compiler (repl) generates class in the same directory.
+     * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
+     * Therefore it's possible to generated class conflict(overwrite) with other repl generated
+     * class.
+     *
+     * To prevent generated class name conflict,
+     * change prefix of generated class name from each scala compiler (repl) instance.
+     *
+     * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
+     * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
+     *
+     * As hashCode() can return a negative integer value and the minus character '-' is invalid
+     * in a package name we change it to a numeric value '0' which still conforms to the regexp.
+     *
+     */
+    System.setProperty("scala.repl.name.line", ("$line" + this.hashCode()).replace('-', '0'));
+    SESSION_NUM.incrementAndGet();
 
-  public abstract void close();
+    createSparkILoop();
+    createSparkContext();
+    createZeppelinContext();
+  }
 
-  public abstract InterpreterResult interpret(String st, InterpreterContext context);
+  public void close() throws InterpreterException {
+    // delete stagingDir for yarn mode
+    if (getSparkMaster().startsWith("yarn")) {
+      YarnConfiguration hadoopConf = new YarnConfiguration();
+      Path appStagingBaseDir = null;
+      if (conf.contains("spark.yarn.stagingDir")) {
+        appStagingBaseDir = new Path(conf.get("spark.yarn.stagingDir"));
+      } else {
+        try {
+          appStagingBaseDir = FileSystem.get(hadoopConf).getHomeDirectory();
+        } catch (IOException e) {
+          LOGGER.error("Fail to get stagingBaseDir", e);
+        }
+      }
+      if (appStagingBaseDir != null) {
+        Path stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId());
+        cleanupStagingDirInternal(stagingDirPath, hadoopConf);
+      }
+    }
+
+    if (sc != null) {
+      sc.stop();
+      sc = null;
+    }
+    if (sparkSession != null) {
+      sparkSession.stop();
+      sparkSession = null;
+    }
+    sqlContext = null;
+    z = null;
+  }
+
+  public abstract void createSparkILoop() throws InterpreterException;
+
+  public abstract void createZeppelinContext() throws InterpreterException;
+
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
+  }
+
+  public abstract InterpreterResult interpret(String st,
+                                              InterpreterContext context) throws InterpreterException;
 
   public abstract InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter,
                                                       String st,
-                                                      InterpreterContext context);
+                                                      InterpreterContext context) throws InterpreterException;
 
   public abstract List<InterpreterCompletion> completion(String buf,
                                                          int cursor,
-                                                         InterpreterContext interpreterContext);
+                                                         InterpreterContext interpreterContext) throws InterpreterException;
+
+  public abstract void bind(String name,
+                            String tpe,
+                            Object value,
+                            List<String> modifier);
+
+  // throw exception when fail to execute the code in scala shell, only used in initialization.
+  // not used t run user code.
+  public abstract void scalaInterpretQuietly(String code) throws InterpreterException;
 
   public abstract ClassLoader getScalaShellClassLoader();
+
+  private List<String> getUserFiles() {
+    return depFiles.stream()
+            .filter(f -> f.endsWith(".jar"))
+            .collect(Collectors.toList());
+  }
+
+  private void createSparkContext() throws InterpreterException {
+    SparkSession.Builder builder = SparkSession.builder().config(conf);
+    if (conf.get("spark.sql.catalogImplementation", "in-memory").equalsIgnoreCase("hive")
+            || conf.get("zeppelin.spark.useHiveContext", "false").equalsIgnoreCase("true")) {
+      boolean hiveSiteExisted =
+              Thread.currentThread().getContextClassLoader().getResource("hive-site.xml") != null;
+      if (hiveSiteExisted && hiveClassesArePresent()) {
+        sparkSession = builder.enableHiveSupport().getOrCreate();
+        LOGGER.info("Created Spark session (with Hive support)");
+      } else {
+        if (!hiveClassesArePresent()) {
+          LOGGER.warn("Hive support can not be enabled because spark is not built with hive");
+        }
+        if (!hiveSiteExisted) {
+          LOGGER.warn("Hive support can not be enabled because no hive-site.xml found");
+        }
+        sparkSession = builder.getOrCreate();
+        LOGGER.info("Created Spark session (without Hive support)");
+      }
+    } else {
+      sparkSession = builder.getOrCreate();
+      LOGGER.info("Created Spark session (without Hive support)");
+    }
+
+    sc = sparkSession.sparkContext();
+    getUserFiles().forEach(file -> sc.addFile(file));
+    if (sc.uiWebUrl().isDefined()) {
+      sparkUrl = sc.uiWebUrl().get();
+    }
+    sqlContext = sparkSession.sqlContext();
+
+    initAndSendSparkWebUrl();
+
+    bind("spark", sparkSession.getClass().getCanonicalName(), sparkSession, Lists.newArrayList("@transient"));
+    bind("sc", "org.apache.spark.SparkContext", sc, Lists.newArrayList("@transient"));
+    bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, Lists.newArrayList("@transient"));
+
+    scalaInterpretQuietly("import org.apache.spark.SparkContext._");
+    scalaInterpretQuietly("import spark.implicits._");
+    scalaInterpretQuietly("import sqlContext.implicits._");
+    scalaInterpretQuietly("import spark.sql");
+    scalaInterpretQuietly("import org.apache.spark.sql.functions._");
+    // print empty string otherwise the last statement's output of this method
+    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
+    scalaInterpretQuietly("print(\"\")");
+  }
+
+  /**
+   * @return true if Hive classes can be loaded, otherwise false.
+   */
+  private boolean hiveClassesArePresent() {
+    try {
+      Class.forName("org.apache.spark.sql.hive.HiveSessionStateBuilder");
+      Class.forName("org.apache.hadoop.hive.conf.HiveConf");
+      return true;
+    } catch (ClassNotFoundException | NoClassDefFoundError e) {
+      return false;
+    }
+  }
+
+  private void initAndSendSparkWebUrl() {
+    String webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
+    if (!StringUtils.isBlank(webUiUrl)) {
+      this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId());
+    } else {
+      useYarnProxyURLIfNeeded();
+    }
+    InterpreterContext.get().getIntpEventClient().sendWebUrlInfo(this.sparkUrl);
+  }
+
+  private String getSparkMaster() {
+    if (conf == null) {
+      return "";
+    } else {
+      return conf.get(SparkStringConstants.MASTER_PROP_NAME,
+              SparkStringConstants.DEFAULT_MASTER_VALUE);
+    }
+  }
+
+  private void cleanupStagingDirInternal(Path stagingDirPath, Configuration hadoopConf) {
+    try {
+      FileSystem fs = stagingDirPath.getFileSystem(hadoopConf);
+      if (fs.delete(stagingDirPath, true)) {
+        LOGGER.info("Deleted staging directory " + stagingDirPath);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, e);
+    }
+  }
+
+  private void useYarnProxyURLIfNeeded() {
+    if (Boolean.parseBoolean(properties.getProperty("spark.webui.yarn.useProxy", "false"))) {
+      if (getSparkMaster().startsWith("yarn")) {
+        String appId = sc.applicationId();
+        YarnClient yarnClient = YarnClient.createYarnClient();
+        YarnConfiguration yarnConf = new YarnConfiguration();
+        // disable timeline service as we only query yarn app here.
+        // Otherwise we may hit this kind of ERROR:
+        // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
+        yarnConf.set("yarn.timeline-service.enabled", "false");
+        yarnClient.init(yarnConf);
+        yarnClient.start();
+        ApplicationReport appReport = null;
+        try {
+          appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId));
+          this.sparkUrl = appReport.getTrackingUrl();
+        } catch (YarnException | IOException e) {
+          LOGGER.error("Fail to get yarn app report", e);
+        }
+      }
+    }
+  }
+
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    String jobGroup = Utils.buildJobGroupId(context);
+    // Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
+    // So only look for the first job which match the jobGroup
+    Optional<SparkJobInfo> jobInfoOptional = Arrays.stream(sc.statusTracker().getJobIdsForGroup(jobGroup))
+            .mapToObj(jobId -> sc.statusTracker().getJobInfo(jobId))
+            .filter(jobInfo -> jobInfo.isDefined())
+            .map(jobInfo -> jobInfo.get())
+            .findFirst();
+    if (jobInfoOptional.isPresent()) {
+      List<SparkStageInfo> stageInfoList = Arrays.stream(jobInfoOptional.get().stageIds())
+              .mapToObj(stageId -> sc.statusTracker().getStageInfo(stageId))
+              .filter(stageInfo -> stageInfo.isDefined())
+              .map(stageInfo -> stageInfo.get())
+              .collect(Collectors.toList());
+      int taskCount = stageInfoList.stream()
+              .map(stageInfo -> stageInfo.numTasks())
+              .collect(Collectors.summingInt(Integer::intValue));
+      int completedTaskCount = stageInfoList.stream()
+              .map(stageInfo -> stageInfo.numCompletedTasks())
+              .collect(Collectors.summingInt(Integer::intValue));
+      LOGGER.debug("Total TaskCount: " + taskCount);
+      LOGGER.debug("Completed TaskCount: " + completedTaskCount);
+      if (taskCount == 0) {
+        return 0;
+      } else {
+        return 100 * completedTaskCount / taskCount;
+      }
+    } else {
+      return 0;
+    }
+  }
 }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 7701ebf52d..035924e603 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -22,6 +22,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
 import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -73,7 +74,7 @@ public class SparkInterpreter extends AbstractInterpreter {
   private SparkContext sc;
   private JavaSparkContext jsc;
   private SQLContext sqlContext;
-  private Object sparkSession;
+  private SparkSession sparkSession;
 
   private SparkVersion sparkVersion;
   private String scalaVersion;
@@ -187,14 +188,14 @@ public class SparkInterpreter extends AbstractInterpreter {
                     .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), innerInterpreterClazz.getClassLoader(), scalaShellOutputDir);
   }
 
-    @Override
+  @Override
   public void close() throws InterpreterException {
     LOGGER.info("Close SparkInterpreter");
     if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
       innerInterpreter.close();
       innerInterpreterClazz = null;
     }
-      innerInterpreter = null;
+    innerInterpreter = null;
   }
 
   @Override
@@ -228,7 +229,7 @@ public class SparkInterpreter extends AbstractInterpreter {
 
   @Override
   public int getProgress(InterpreterContext context) throws InterpreterException {
-    return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
+    return innerInterpreter.getProgress(context);
   }
 
   public ZeppelinContext getZeppelinContext() {
@@ -240,7 +241,7 @@ public class SparkInterpreter extends AbstractInterpreter {
 
   public InterpreterResult delegateInterpret(KotlinInterpreter kotlinInterpreter,
                                              String code,
-                                             InterpreterContext context) {
+                                             InterpreterContext context) throws InterpreterException{
     return innerInterpreter.delegateInterpret(kotlinInterpreter, code, context);
   }
 
@@ -248,14 +249,7 @@ public class SparkInterpreter extends AbstractInterpreter {
     return this.sc;
   }
 
-  /**
-   * Must use Object, because the its api signature in Spark 1.x is different from
-   * that of Spark 2.x.
-   * e.g. SqlContext.sql(sql) return different type.
-   *
-   * @return
-   */
-  public Object getSQLContext() {
+  public SQLContext getSQLContext() {
     return sqlContext;
   }
 
@@ -263,7 +257,7 @@ public class SparkInterpreter extends AbstractInterpreter {
     return this.jsc;
   }
 
-  public Object getSparkSession() {
+  public SparkSession getSparkSession() {
     return sparkSession;
   }
 
@@ -297,11 +291,11 @@ public class SparkInterpreter extends AbstractInterpreter {
     }
   }
 
-  public boolean isScala211() throws InterpreterException {
+  public boolean isScala211() {
     return scalaVersion.equals("2.11");
   }
 
-  public boolean isScala212() throws InterpreterException {
+  public boolean isScala212() {
     return scalaVersion.equals("2.12");
   }
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 4512ddbe6d..4335b6a107 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -148,9 +148,4 @@ public class SparkRInterpreter extends RInterpreter {
     return sparkInterpreter.getZeppelinContext();
   }
 
-  @Override
-  public List<InterpreterCompletion> completion(String buf, int cursor,
-                                                InterpreterContext interpreterContext) {
-    return new ArrayList<>();
-  }
 }
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index cb83bec24e..b2b9a69d32 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.spark;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.SQLContext;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
 import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -57,7 +58,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
     this.sqlSplitter = new SqlSplitter();
   }
 
-  public boolean concurrentSQL() {
+  private boolean concurrentSQL() {
     return Boolean.parseBoolean(getProperty("zeppelin.spark.concurrentSQL"));
   }
 
@@ -83,7 +84,7 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
     }
     Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
     sparkInterpreter.getZeppelinContext().setInterpreterContext(context);
-    Object sqlContext = sparkInterpreter.getSQLContext();
+    SQLContext sqlContext = sparkInterpreter.getSQLContext();
     SparkContext sc = sparkInterpreter.getSparkContext();
 
     List<String> sqls = sqlSplitter.splitSql(st);
@@ -96,11 +97,10 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
     ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(sparkInterpreter.getScalaShellClassLoader());
-      Method method = sqlContext.getClass().getMethod("sql", String.class);
       for (String sql : sqls) {
         curSql = sql;
         String result = sparkInterpreter.getZeppelinContext()
-                .showData(method.invoke(sqlContext, sql), maxResult);
+                .showData(sqlContext.sql(sql), maxResult);
         context.out.write(result);
       }
       context.out.flush();
@@ -156,7 +156,6 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
     return FormType.SIMPLE;
   }
 
-
   @Override
   public int getProgress(InterpreterContext context) throws InterpreterException {
     return sparkInterpreter.getProgress(context);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index ea8fb8b4d0..578af8ed47 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -27,83 +27,17 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Utility and helper functions for the Spark Interpreter
  */
 class Utils {
-  public static Logger logger = LoggerFactory.getLogger(Utils.class);
-  public static String DEPRRECATED_MESSAGE =
+  private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
+  private static String DEPRECATED_MESSAGE =
           "%html <font color=\"red\">Spark lower than 2.2 is deprecated, " +
           "if you don't want to see this message, please set " +
           "zeppelin.spark.deprecateMsg.show to false.</font>";
 
-  static Object invokeMethod(Object o, String name) {
-    return invokeMethod(o, name, new Class[]{}, new Object[]{});
-  }
-
-  static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) {
-    try {
-      return o.getClass().getMethod(name, argTypes).invoke(o, params);
-    } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-      logger.error(e.getMessage(), e);
-    }
-    return null;
-  }
-
-  static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) {
-    try {
-      return c.getMethod(name, argTypes).invoke(null, params);
-    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
-      logger.error(e.getMessage(), e);
-    }
-    return null;
-  }
-
-  static Object invokeStaticMethod(Class<?> c, String name) {
-    return invokeStaticMethod(c, name, new Class[]{}, new Object[]{});
-  }
-
-  static Class<?> findClass(String name) {
-    return findClass(name, false);
-  }
-
-  static Class<?> findClass(String name, boolean silence) {
-    try {
-      return Class.forName(name);
-    } catch (ClassNotFoundException e) {
-      if (!silence) {
-        logger.error(e.getMessage(), e);
-      }
-      return null;
-    }
-  }
-
-  static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) {
-    try {
-      Constructor<?> constructor = Utils.class.getClassLoader()
-              .loadClass(name).getConstructor(argTypes);
-      return constructor.newInstance(params);
-    } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException |
-      InstantiationException | InvocationTargetException e) {
-      logger.error(e.getMessage(), e);
-    }
-    return null;
-  }
-
-  // function works after intp is initialized
-  static boolean isScala2_10() {
-    try {
-      Class.forName("org.apache.spark.repl.SparkIMain");
-      return true;
-    } catch (ClassNotFoundException e) {
-      return false;
-    } catch (IncompatibleClassChangeError e) {
-      return false;
-    }
-  }
 
   public static String buildJobGroupId(InterpreterContext context) {
     String uName = "anonymous";
@@ -136,7 +70,7 @@ class Utils {
             && Boolean.parseBoolean(
                     properties.getProperty("zeppelin.spark.deprecatedMsg.show", "true"))) {
       try {
-        context.out.write(DEPRRECATED_MESSAGE);
+        context.out.write(DEPRECATED_MESSAGE);
         context.out.write("%text ");
       } catch (IOException e) {
         throw new InterpreterException(e);
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 045d91ab46..3e89730bd2 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -140,12 +140,6 @@ public class SparkInterpreterTest {
     result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    if (!interpreter.isScala213()) {
-      // $intp not available for scala-2.13
-      result = interpreter.interpret("$intp", getInterpreterContext());
-      assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    }
-    
     // Companion object with case class
     result = interpreter.interpret("import scala.math._\n" +
         "object Circle {\n" +
@@ -207,6 +201,7 @@ public class SparkInterpreterTest {
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
+    context = getInterpreterContext();
     result = interpreter.interpret(
         "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" +
             "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
@@ -216,8 +211,8 @@ public class SparkInterpreterTest {
             "            s(3).replaceAll(\"\\\"\", \"\"),\n" +
             "            s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
             "        )\n" +
-            ").toDF()", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+            ").toDF()", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
     // spark version
     result = interpreter.interpret("sc.version", getInterpreterContext());
diff --git a/spark/scala-2.11/spark-scala-parent b/spark/scala-2.11/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.11/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 013cab42cd..bb38c71a87 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -17,92 +17,43 @@
 
 package org.apache.zeppelin.spark
 
-import java.io.{BufferedReader, File}
-import java.net.URLClassLoader
-import java.nio.file.{Files, Paths}
-import java.util.Properties
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
-import org.slf4j.LoggerFactory
-import org.slf4j.Logger
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
+import org.slf4j.{Logger, LoggerFactory}
 
+import java.io.{BufferedReader, File, PrintStream}
+import java.net.URLClassLoader
+import java.nio.file.Paths
+import java.util.Properties
+import scala.collection.JavaConverters._
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
-/**
-  * SparkInterpreter for scala-2.11
-  */
-class SparkScala211Interpreter(override val conf: SparkConf,
-                               override val depFiles: java.util.List[String],
-                               override val properties: Properties,
-                               override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader,
-                               val outputDir: File)
-  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
-  import SparkScala211Interpreter._
+/**
+ * SparkInterpreter for scala-2.11.
+ * It only works for Spark 2.x, as Spark 3.x doesn't support scala-2.11
+ */
+class SparkScala211Interpreter(conf: SparkConf,
+                               depFiles: java.util.List[String],
+                               properties: Properties,
+                               interpreterGroup: InterpreterGroup,
+                               sparkInterpreterClassLoader: URLClassLoader,
+                               outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
 
-  lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+  private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
   private var sparkILoop: SparkILoop = _
-
   private var scalaCompletion: Completion = _
+  private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+  private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+    SparkStringConstants.DEFAULT_MASTER_VALUE)
 
-  override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
-  override def open(): Unit = {
-    super.open()
-    if (sparkMaster == "yarn-client") {
-      System.setProperty("SPARK_YARN_MODE", "true")
-    }
-
-    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-    val target = conf.get("spark.repl.target", "jvm-1.6")
-    val settings = new Settings()
-    settings.processArguments(List("-Yrepl-class-based",
-      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
-    settings.embeddedDefaults(sparkInterpreterClassLoader)
-    settings.usejavacp.value = true
-    settings.target.value = target
-
-    this.userJars = getUserJars()
-    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
-    settings.classpath.value = userJars.mkString(File.pathSeparator)
-
-    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
-    val replOut = if (printReplOutput) {
-      new JPrintWriter(interpreterOutput, true)
-    } else {
-      new JPrintWriter(Console.out, true)
-    }
-    sparkILoop = new SparkILoop(None, replOut)
-    sparkILoop.settings = settings
-    sparkILoop.createInterpreter()
-
-    val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
-    val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
-
-    sparkILoop.in = reader
-    sparkILoop.initializeSynchronous()
-    loopPostInit(this)
-    this.scalaCompletion = reader.completion
-
-    createSparkContext()
-    scalaInterpret("import org.apache.spark.SparkContext._")
-    scalaInterpret("import spark.implicits._")
-    scalaInterpret("import spark.sql")
-    scalaInterpret("import org.apache.spark.sql.functions._")
-    // print empty string otherwise the last statement's output of this method
-    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
-    scalaInterpret("print(\"\")")
-    createZeppelinContext()
-  }
-
-  def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+  override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
 
     val originalOut = System.out
     val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
@@ -153,20 +104,21 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     }
 
     lastStatus match {
-      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
       case _ => new InterpreterResult(lastStatus)
     }
   }
 
-  protected override def completion(buf: String,
-                                    cursor: Int,
-                                    context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
+  override def completion(buf: String,
+                          cursor: Int,
+                          context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    scalaCompletion.completer().complete(buf.substring(0, cursor), cursor)
+      .candidates
       .map(e => new InterpreterCompletion(e, e, null))
-    scala.collection.JavaConversions.seqAsJavaList(completions)
+      .asJava
   }
 
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+  private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
       val result = sparkILoop.bind(name, tpe, value, modifier)
       if (result != IR.Success) {
@@ -175,43 +127,155 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     }
   }
 
+  override def bind(name: String,
+                    tpe: String,
+                    value: Object,
+                    modifier: java.util.List[String]): Unit =
+    bind(name, tpe, value, modifier.asScala.toList)
+
+  private def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
+    sparkILoop.interpret(code)
+
+  @throws[InterpreterException]
+  def scalaInterpretQuietly(code: String): Unit = {
+    scalaInterpret(code) match {
+      case scala.tools.nsc.interpreter.Results.Success =>
+      // do nothing
+      case scala.tools.nsc.interpreter.Results.Error =>
+        throw new InterpreterException("Fail to run code: " + code)
+      case scala.tools.nsc.interpreter.Results.Incomplete =>
+        throw new InterpreterException("Incomplete code: " + code)
+    }
+  }
+
+  override def getScalaShellClassLoader: ClassLoader = {
+    sparkILoop.classLoader
+  }
+
+  // Used by KotlinSparkInterpreter
+  override def delegateInterpret(interpreter: KotlinInterpreter,
+                                 code: String,
+                                 context: InterpreterContext): InterpreterResult = {
+    val out = context.out
+    val newOut = if (out != null) new PrintStream(out) else null
+    Console.withOut(newOut) {
+      interpreter.interpret(code, context)
+    }
+  }
+
   override def close(): Unit = {
     super.close()
     if (sparkILoop != null) {
       sparkILoop.closeInterpreter()
-      sparkILoop = null
     }
   }
 
-  def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
-    sparkILoop.interpret(code)
+  override def createSparkILoop(): Unit = {
+    if (sparkMaster == "yarn-client") {
+      System.setProperty("SPARK_YARN_MODE", "true")
+    }
 
-  override def getScalaShellClassLoader: ClassLoader = {
-    sparkILoop.classLoader
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+    val target = conf.get("spark.repl.target", "jvm-1.6")
+    val settings = new Settings()
+    settings.processArguments(List("-Yrepl-class-based",
+      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
+    settings.usejavacp.value = true
+    settings.target.value = target
+    val userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+    val replOut = if (printReplOutput) {
+      new JPrintWriter(interpreterOutput, true)
+    } else {
+      new JPrintWriter(Console.out, true)
+    }
+    sparkILoop = new SparkILoop(None, replOut)
+    sparkILoop.settings = settings
+    sparkILoop.createInterpreter()
+
+    val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
+    val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
+
+    sparkILoop.in = reader
+    sparkILoop.initializeSynchronous()
+    SparkScala211Interpreter.loopPostInit(this)
+    this.scalaCompletion = reader.completion
+  }
+
+  override def createZeppelinContext(): Unit = {
+    val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    z = new SparkZeppelinContext(sc, sparkShims,
+      interpreterGroup.getInterpreterHookRegistry,
+      properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+    bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+  }
+
+  private def getField(obj: Object, name: String): Object = {
+    val field = obj.getClass.getField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  private def callMethod(obj: Object, name: String,
+                         parameterTypes: Array[Class[_]],
+                         parameters: Array[Object]): Object = {
+    val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+    method.setAccessible(true)
+    method.invoke(obj, parameters: _ *)
+  }
+
+  private def getUserJars(): Seq[String] = {
+    var classLoader = Thread.currentThread().getContextClassLoader
+    var extraJars = Seq.empty[String]
+    while (classLoader != null) {
+      if (classLoader.getClass.getCanonicalName ==
+        "org.apache.spark.util.MutableURLClassLoader") {
+        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+          // Check if the file exists.
+          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+          .filterNot {
+            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+          }
+          .map(url => url.toString).toSeq
+        classLoader = null
+      } else {
+        classLoader = classLoader.getParent
+      }
+    }
+
+    extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+    extraJars
   }
 }
 
 private object SparkScala211Interpreter {
 
   /**
-    * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such
-    * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here,
-    * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at
-    * Scala 2.11.12 since here we do not have to load files.
-    *
-    * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and
-    * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12.
-    *
-    * Please see the codes below:
-    * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala
-    * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala
-    *
-    * See also ZEPPELIN-3810.
-    */
+   * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such
+   * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here,
+   * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at
+   * Scala 2.11.12 since here we do not have to load files.
+   *
+   * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and
+   * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12.
+   *
+   * Please see the codes below:
+   * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala
+   * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala
+   *
+   * See also ZEPPELIN-3810.
+   */
   private def loopPostInit(interpreter: SparkScala211Interpreter): Unit = {
     import StdReplTags._
-    import scala.reflect.classTag
-    import scala.reflect.io
+    import scala.reflect.{classTag, io}
 
     val sparkILoop = interpreter.sparkILoop
     val intp = sparkILoop.intp
@@ -258,5 +322,4 @@ private object SparkScala211Interpreter {
 
     loopPostInit()
   }
-
 }
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
copy to spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
 import java.util
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/scala-2.12/spark-scala-parent b/spark/scala-2.12/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.12/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index f31293239a..764dda93b0 100644
--- a/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -17,88 +17,44 @@
 
 package org.apache.zeppelin.spark
 
-import java.io.{BufferedReader, File}
-import java.net.URLClassLoader
-import java.util.Properties
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
-import org.slf4j.LoggerFactory
-import org.slf4j.Logger
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
+import org.slf4j.{Logger, LoggerFactory}
 
+import java.io.{BufferedReader, File, PrintStream}
+import java.net.URLClassLoader
+import java.nio.file.Paths
+import java.util.Properties
+import scala.collection.JavaConverters._
 import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.ILoop.loopToInterpreter
 import scala.tools.nsc.interpreter._
 
+
 /**
-  * SparkInterpreter for scala-2.12
-  */
-class SparkScala212Interpreter(override val conf: SparkConf,
-                               override val depFiles: java.util.List[String],
-                               override val properties: Properties,
-                               override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader,
-                               val outputDir: File)
-  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
-
-  lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ * SparkInterpreter for scala-2.12.
+ * It is used by both Spark 2.x and 3.x
+ */
+class SparkScala212Interpreter(conf: SparkConf,
+                               depFiles: java.util.List[String],
+                               properties: Properties,
+                               interpreterGroup: InterpreterGroup,
+                               sparkInterpreterClassLoader: URLClassLoader,
+                               outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
 
-  private var sparkILoop: SparkILoop = _
+  private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
+  private var sparkILoop: SparkILoop = _
   private var scalaCompletion: Completion = _
+  private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+  private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+    SparkStringConstants.DEFAULT_MASTER_VALUE)
 
-  override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
-  override def open(): Unit = {
-    super.open()
-    if (sparkMaster == "yarn-client") {
-      System.setProperty("SPARK_YARN_MODE", "true")
-    }
-
-    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
-    val settings = new Settings()
-    settings.processArguments(List("-Yrepl-class-based",
-      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
-    settings.embeddedDefaults(sparkInterpreterClassLoader)
-    settings.usejavacp.value = true
-    this.userJars = getUserJars()
-    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
-    settings.classpath.value = userJars.mkString(File.pathSeparator)
-
-    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
-    val replOut = if (printReplOutput) {
-      new JPrintWriter(interpreterOutput, true)
-    } else {
-      new JPrintWriter(Console.out, true)
-    }
-    sparkILoop = new SparkILoop(None, replOut)
-    sparkILoop.settings = settings
-    sparkILoop.createInterpreter()
-    val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
-    val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
-
-    sparkILoop.in = reader
-    sparkILoop.initializeSynchronous()
-    SparkScala212Interpreter.loopPostInit(this)
-    this.scalaCompletion = reader.completion
-
-    createSparkContext()
-
-    scalaInterpret("import org.apache.spark.SparkContext._")
-    scalaInterpret("import spark.implicits._")
-    scalaInterpret("import spark.sql")
-    scalaInterpret("import org.apache.spark.sql.functions._")
-    // print empty string otherwise the last statement's output of this method
-    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
-    scalaInterpret("print(\"\")")
-
-    createZeppelinContext()
-  }
-
-  def interpret(code: String, context: InterpreterContext): InterpreterResult = {
+  override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
 
     val originalOut = System.out
     val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
@@ -149,20 +105,21 @@ class SparkScala212Interpreter(override val conf: SparkConf,
     }
 
     lastStatus match {
-      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
       case _ => new InterpreterResult(lastStatus)
     }
   }
 
-  protected override def completion(buf: String,
-                           cursor: Int,
-                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+  override def completion(buf: String,
+                          cursor: Int,
+                          context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    scalaCompletion.complete(buf.substring(0, cursor), cursor)
+      .candidates
       .map(e => new InterpreterCompletion(e, e, null))
-    scala.collection.JavaConversions.seqAsJavaList(completions)
+      .asJava
   }
 
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+  private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
       val result = sparkILoop.bind(name, tpe, value, modifier)
       if (result != IR.Success) {
@@ -171,22 +128,134 @@ class SparkScala212Interpreter(override val conf: SparkConf,
     }
   }
 
+  override def bind(name: String,
+                    tpe: String,
+                    value: Object,
+                    modifier: java.util.List[String]): Unit =
+    bind(name, tpe, value, modifier.asScala.toList)
+
+  def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
+    sparkILoop.interpret(code)
+
+  @throws[InterpreterException]
+  def scalaInterpretQuietly(code: String): Unit = {
+    scalaInterpret(code) match {
+      case scala.tools.nsc.interpreter.Results.Success =>
+      // do nothing
+      case scala.tools.nsc.interpreter.Results.Error =>
+        throw new InterpreterException("Fail to run code: " + code)
+      case scala.tools.nsc.interpreter.Results.Incomplete =>
+        throw new InterpreterException("Incomplete code: " + code)
+    }
+  }
+
+  override def getScalaShellClassLoader: ClassLoader = {
+    sparkILoop.classLoader
+  }
+
+  // Used by KotlinSparkInterpreter
+  override def delegateInterpret(interpreter: KotlinInterpreter,
+                        code: String,
+                        context: InterpreterContext): InterpreterResult = {
+    val out = context.out
+    val newOut = if (out != null) new PrintStream(out) else null
+    Console.withOut(newOut) {
+      interpreter.interpret(code, context)
+    }
+  }
+
+  def interpret(code: String): InterpreterResult =
+    interpret(code, InterpreterContext.get())
 
   override def close(): Unit = {
     super.close()
     if (sparkILoop != null) {
       sparkILoop.closeInterpreter()
-      sparkILoop = null
     }
   }
 
-  def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
-    sparkILoop.interpret(code)
+  override def createSparkILoop(): Unit = {
+    if (sparkMaster == "yarn-client") {
+      System.setProperty("SPARK_YARN_MODE", "true")
+    }
 
-  override def getScalaShellClassLoader: ClassLoader = {
-    sparkILoop.classLoader
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+
+    val settings = new Settings()
+    settings.processArguments(List("-Yrepl-class-based",
+      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
+    settings.usejavacp.value = true
+    val userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+    val replOut = if (printReplOutput) {
+      new JPrintWriter(interpreterOutput, true)
+    } else {
+      new JPrintWriter(Console.out, true)
+    }
+    sparkILoop = new SparkILoop(None, replOut)
+    sparkILoop.settings = settings
+    sparkILoop.createInterpreter()
+    val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
+    val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
+
+    sparkILoop.in = reader
+    sparkILoop.initializeSynchronous()
+    SparkScala212Interpreter.loopPostInit(this)
+    this.scalaCompletion = reader.completion
+  }
+
+  override def createZeppelinContext(): Unit = {
+    val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    z = new SparkZeppelinContext(sc, sparkShims,
+      interpreterGroup.getInterpreterHookRegistry,
+      properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+    bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+  }
+
+  private def getDeclareField(obj: Object, name: String): Object = {
+    val field = obj.getClass.getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
   }
 
+  private def callMethod(obj: Object, name: String,
+                           parameterTypes: Array[Class[_]],
+                           parameters: Array[Object]): Object = {
+    val method = obj.getClass.getMethod(name, parameterTypes: _ *)
+    method.setAccessible(true)
+    method.invoke(obj, parameters: _ *)
+  }
+
+  private def getUserJars(): Seq[String] = {
+    var classLoader = Thread.currentThread().getContextClassLoader
+    var extraJars = Seq.empty[String]
+    while (classLoader != null) {
+      if (classLoader.getClass.getCanonicalName ==
+        "org.apache.spark.util.MutableURLClassLoader") {
+        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+          // Check if the file exists.
+          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+          .filterNot {
+            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+          }
+          .map(url => url.toString).toSeq
+        classLoader = null
+      } else {
+        classLoader = classLoader.getParent
+      }
+    }
+
+    extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+    extraJars
+  }
 }
 
 private object SparkScala212Interpreter {
@@ -207,8 +276,7 @@ private object SparkScala212Interpreter {
    */
   private def loopPostInit(interpreter: SparkScala212Interpreter): Unit = {
     import StdReplTags._
-    import scala.reflect.classTag
-    import scala.reflect.io
+    import scala.reflect.{classTag, io}
 
     val sparkILoop = interpreter.sparkILoop
     val intp = sparkILoop.intp
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
copy from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
copy to spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
 import java.util
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/scala-2.13/spark-scala-parent b/spark/scala-2.13/spark-scala-parent
deleted file mode 120000
index e5e899e58c..0000000000
--- a/spark/scala-2.13/spark-scala-parent
+++ /dev/null
@@ -1 +0,0 @@
-../spark-scala-parent
\ No newline at end of file
diff --git a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
index f896101dda..91afc8545c 100644
--- a/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
+++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkScala213Interpreter.scala
@@ -21,78 +21,40 @@ package org.apache.zeppelin.spark
 import org.apache.spark.SparkConf
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterGroup, InterpreterResult}
+import org.apache.zeppelin.kotlin.KotlinInterpreter
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintStream, PrintWriter}
 import java.net.URLClassLoader
+import java.nio.file.Paths
 import java.util.Properties
+import scala.jdk.CollectionConverters._
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 import scala.tools.nsc.interpreter.shell.{Accumulator, Completion, ReplCompletion}
 
 /**
-  * SparkInterpreter for scala-2.13
-  */
-class SparkScala213Interpreter(override val conf: SparkConf,
-                               override val depFiles: java.util.List[String],
-                               override val properties: Properties,
-                               override val interpreterGroup: InterpreterGroup,
-                               override val sparkInterpreterClassLoader: URLClassLoader,
-                               val outputDir: File)
-  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
-
-  lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
+ * SparkInterpreter for scala-2.13.
+ * It only works for Spark 3.x because only Spark 3.x supports scala-2.13.
+ */
+class SparkScala213Interpreter(conf: SparkConf,
+                               depFiles: java.util.List[String],
+                               properties: Properties,
+                               interpreterGroup: InterpreterGroup,
+                               sparkInterpreterClassLoader: URLClassLoader,
+                               outputDir: File) extends AbstractSparkScalaInterpreter(conf, properties, depFiles) {
 
-  private var sparkILoop: SparkILoop = _
+  private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
+  private var sparkILoop: SparkILoop = _
   private var scalaCompletion: Completion = _
+  private val interpreterOutput = new InterpreterOutputStream(LOGGER)
+  private val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
+    SparkStringConstants.DEFAULT_MASTER_VALUE)
 
-  override val interpreterOutput = new InterpreterOutputStream(LOGGER)
-
-  override def open(): Unit = {
-    super.open()
-    if (sparkMaster == "yarn-client") {
-      System.setProperty("SPARK_YARN_MODE", "true")
-    }
-    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
-    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-
-    val settings = new Settings()
-    settings.processArguments(List("-Yrepl-class-based",
-      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
-    settings.embeddedDefaults(sparkInterpreterClassLoader)
-    settings.usejavacp.value = true
-    this.userJars = getUserJars()
-    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
-    settings.classpath.value = userJars.mkString(File.pathSeparator)
-
-    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
-    val replOut = if (printReplOutput) {
-      new PrintWriter(interpreterOutput, true)
-    } else {
-      new PrintWriter(Console.out, true)
-    }
-    sparkILoop = new SparkILoop(null, replOut)
-    sparkILoop.run(settings)
-    this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator)
-    Thread.currentThread.setContextClassLoader(sparkILoop.classLoader)
-
-    createSparkContext()
-
-    scalaInterpret("import org.apache.spark.SparkContext._")
-    scalaInterpret("import spark.implicits._")
-    scalaInterpret("import spark.sql")
-    scalaInterpret("import org.apache.spark.sql.functions._")
-    // print empty string otherwise the last statement's output of this method
-    // (aka. import org.apache.spark.sql.functions._) will mix with the output of user code
-    scalaInterpret("print(\"\")")
-
-    createZeppelinContext()
-  }
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
-
     val originalOut = System.out
     val printREPLOutput = context.getStringLocalProperty("printREPLOutput", "true").toBoolean
 
@@ -142,23 +104,36 @@ class SparkScala213Interpreter(override val conf: SparkConf,
     }
 
     lastStatus match {
-      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult( lastStatus, "Incomplete expression" )
+      case InterpreterResult.Code.INCOMPLETE => new InterpreterResult(lastStatus, "Incomplete expression")
       case _ => new InterpreterResult(lastStatus)
     }
   }
 
-  def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result =
+  private def scalaInterpret(code: String): scala.tools.nsc.interpreter.Results.Result =
     sparkILoop.interpret(code)
 
-  protected override def completion(buf: String,
-                                    cursor: Int,
-                                    context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+  @throws[InterpreterException]
+  def scalaInterpretQuietly(code: String): Unit = {
+    scalaInterpret(code) match {
+      case scala.tools.nsc.interpreter.Results.Success =>
+        // do nothing
+      case scala.tools.nsc.interpreter.Results.Error =>
+        throw new InterpreterException("Fail to run code: " + code)
+      case scala.tools.nsc.interpreter.Results.Incomplete =>
+        throw new InterpreterException("Incomplete code: " + code)
+    }
+  }
+
+  override def completion(buf: String,
+                          cursor: Int,
+                          context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    scalaCompletion.complete(buf.substring(0, cursor), cursor)
+      .candidates
       .map(e => new InterpreterCompletion(e.defString, e.defString, null))
-    scala.collection.JavaConverters.asJava(completions)
+      .asJava
   }
 
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
+  private def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
       val result = sparkILoop.bind(name, tpe, value, modifier)
       if (result != Results.Success) {
@@ -167,6 +142,27 @@ class SparkScala213Interpreter(override val conf: SparkConf,
     }
   }
 
+  override def bind(name: String,
+                    tpe: String,
+                    value: Object,
+                    modifier: java.util.List[String]): Unit =
+    bind(name, tpe, value, modifier.asScala.toList)
+
+  override def getScalaShellClassLoader: ClassLoader = {
+    sparkILoop.classLoader
+  }
+
+  // Used by KotlinSparkInterpreter
+  def delegateInterpret(interpreter: KotlinInterpreter,
+                        code: String,
+                        context: InterpreterContext): InterpreterResult = {
+    val out = context.out
+    val newOut = if (out != null) new PrintStream(out) else null
+    Console.withOut(newOut) {
+      interpreter.interpret(code, context)
+    }
+  }
+
   override def close(): Unit = {
     super.close()
     if (sparkILoop != null) {
@@ -174,7 +170,66 @@ class SparkScala213Interpreter(override val conf: SparkConf,
     }
   }
 
-  override def getScalaShellClassLoader: ClassLoader = {
-    sparkILoop.classLoader
+  override def createSparkILoop(): Unit = {
+    if (sparkMaster == "yarn-client") {
+      System.setProperty("SPARK_YARN_MODE", "true")
+    }
+    LOGGER.info("Scala shell repl output dir: " + outputDir.getAbsolutePath)
+    conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
+
+    // createSpark
+    val settings = new Settings()
+    settings.processArguments(List("-Yrepl-class-based",
+      "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
+    settings.usejavacp.value = true
+    val userJars = getUserJars()
+    LOGGER.info("UserJars: " + userJars.mkString(File.pathSeparator))
+    settings.classpath.value = userJars.mkString(File.pathSeparator)
+
+    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
+    val replOut = if (printReplOutput) {
+      new PrintWriter(interpreterOutput, true)
+    } else {
+      new PrintWriter(Console.out, true)
+    }
+    sparkILoop = new SparkILoop(null, replOut)
+    sparkILoop.run(settings)
+    this.scalaCompletion = new ReplCompletion(sparkILoop.intp, new Accumulator)
+    Thread.currentThread.setContextClassLoader(sparkILoop.classLoader)
+  }
+
+  override def createZeppelinContext(): Unit = {
+    val sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+    z = new SparkZeppelinContext(sc, sparkShims,
+      interpreterGroup.getInterpreterHookRegistry,
+      properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
+    bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+  }
+
+  private def getUserJars(): Seq[String] = {
+    var classLoader = Thread.currentThread().getContextClassLoader
+    var extraJars = Seq.empty[String]
+    while (classLoader != null) {
+      if (classLoader.getClass.getCanonicalName ==
+        "org.apache.spark.util.MutableURLClassLoader") {
+        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+          // Check if the file exists.
+          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+          .filterNot {
+            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+          }
+          .map(url => url.toString).toSeq
+        classLoader = null
+      } else {
+        classLoader = classLoader.getParent
+      }
+    }
+
+    extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
+    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+    extraJars
   }
 }
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 99%
rename from spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
rename to spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index fa4188c036..410ed4cf54 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/scala-2.13/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -20,7 +20,6 @@ package org.apache.zeppelin.spark
 import java.util
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.DataFrame
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml
index 1cc02a3a63..d00ed99bba 100644
--- a/spark/spark-scala-parent/pom.xml
+++ b/spark/spark-scala-parent/pom.xml
@@ -149,65 +149,6 @@
                 </executions>
             </plugin>
 
-            <plugin>
-                <groupId>org.codehaus.mojo</groupId>
-                <artifactId>build-helper-maven-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>add-scala-sources</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>${project.basedir}/../spark-scala-parent/src/main/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-scala-test-sources</id>
-                        <phase>generate-test-sources</phase>
-                        <goals>
-                            <goal>add-test-source</goal>
-                        </goals>
-                        <configuration>
-                            <sources>
-                                <source>${project.basedir}/../spark-scala-parent/src/test/scala</source>
-                            </sources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-resource</id>
-                        <phase>generate-resources</phase>
-                        <goals>
-                            <goal>add-resource</goal>
-                        </goals>
-                        <configuration>
-                            <resources>
-                                <resource>
-                                    <directory>${project.basedir}/../spark-scala-parent/src/main/resources</directory>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>add-test-resource</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>add-test-resource</goal>
-                        </goals>
-                        <configuration>
-                            <resources>
-                                <resource>
-                                    <directory>${project.basedir}/../spark-scala-parent/src/test/resources</directory>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-
             <plugin>
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
deleted file mode 100644
index b07de98bfe..0000000000
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark
-
-
-import java.io.{File, IOException, PrintStream}
-import java.net.URLClassLoader
-import java.nio.file.Paths
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.yarn.client.api.YarnClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup, InterpreterResult, ZeppelinContext}
-import org.apache.zeppelin.kotlin.KotlinInterpreter
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConverters._
-
-
-/**
-  * Base class for different scala versions of SparkInterpreter. It should be
-  * binary compatible between multiple scala versions.
-  *
-  * @param conf
-  * @param depFiles
-  * @param properties
-  * @param interpreterGroup
-  */
-abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
-                                         val depFiles: java.util.List[String],
-                                         val properties: java.util.Properties,
-                                         val interpreterGroup: InterpreterGroup,
-                                         val sparkInterpreterClassLoader: URLClassLoader)
-  extends AbstractSparkScalaInterpreter() {
-
-  protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
-
-  protected var sc: SparkContext = _
-
-  protected var sqlContext: SQLContext = _
-
-  protected var sparkSession: SparkSession = _
-
-  protected var userJars: Seq[String] = _
-
-  protected var sparkUrl: String = _
-
-  protected var z: SparkZeppelinContext = _
-
-  protected val interpreterOutput: InterpreterOutputStream
-
-  protected val sparkMaster: String = conf.get(SparkStringConstants.MASTER_PROP_NAME,
-    SparkStringConstants.DEFAULT_MASTER_VALUE)
-
-  protected def open(): Unit = {
-    /* Required for scoped mode.
-     * In scoped mode multiple scala compiler (repl) generates class in the same directory.
-     * Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
-     * Therefore it's possible to generated class conflict(overwrite) with other repl generated
-     * class.
-     *
-     * To prevent generated class name conflict,
-     * change prefix of generated class name from each scala compiler (repl) instance.
-     *
-     * In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
-     * ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
-     *
-     * As hashCode() can return a negative integer value and the minus character '-' is invalid
-     * in a package name we change it to a numeric value '0' which still conforms to the regexp.
-     *
-     */
-    System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
-
-    BaseSparkScalaInterpreter.sessionNum.incrementAndGet()
-  }
-
-  // Used by KotlinSparkInterpreter
-  def delegateInterpret(interpreter: KotlinInterpreter,
-                        code: String,
-                        context: InterpreterContext): InterpreterResult = {
-    val out = context.out
-    val newOut = if (out != null) new PrintStream(out) else null
-    Console.withOut(newOut) {
-      interpreter.interpret(code, context)
-    }
-  }
-
-  protected def interpret(code: String): InterpreterResult =
-    interpret(code, InterpreterContext.get())
-
-  protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
-    JobProgressUtil.progress(sc, jobGroup)
-  }
-
-  override def getSparkContext: SparkContext = sc
-
-  override def getSqlContext: SQLContext = sqlContext
-
-  override def getSparkSession: AnyRef = sparkSession
-
-  override def getSparkUrl: String = sparkUrl
-
-  override def getZeppelinContext: ZeppelinContext = z
-
-  protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
-
-  // for use in java side
-  protected def bind(name: String,
-                     tpe: String,
-                     value: Object,
-                     modifier: java.util.List[String]): Unit =
-    bind(name, tpe, value, modifier.asScala.toList)
-
-  protected def close(): Unit = {
-    // delete stagingDir for yarn mode
-    if (sparkMaster.startsWith("yarn")) {
-      val hadoopConf = new YarnConfiguration()
-      val appStagingBaseDir = if (conf.contains("spark.yarn.stagingDir")) {
-        new Path(conf.get("spark.yarn.stagingDir"))
-      } else {
-        FileSystem.get(hadoopConf).getHomeDirectory()
-      }
-      val stagingDirPath = new Path(appStagingBaseDir, ".sparkStaging" + "/" + sc.applicationId)
-      cleanupStagingDirInternal(stagingDirPath, hadoopConf)
-    }
-
-    if (sc != null) {
-      sc.stop()
-      sc = null
-    }
-    if (sparkSession != null) {
-      sparkSession.getClass.getMethod("stop").invoke(sparkSession)
-      sparkSession = null
-    }
-    sqlContext = null
-    z = null
-  }
-
-  private def cleanupStagingDirInternal(stagingDirPath: Path, hadoopConf: Configuration): Unit = {
-    try {
-      val fs = stagingDirPath.getFileSystem(hadoopConf)
-      if (fs.delete(stagingDirPath, true)) {
-        LOGGER.info(s"Deleted staging directory $stagingDirPath")
-      }
-    } catch {
-      case ioe: IOException =>
-        LOGGER.warn("Failed to cleanup staging dir " + stagingDirPath, ioe)
-    }
-  }
-
-  protected def createSparkContext(): Unit = {
-    spark2CreateContext()
-  }
-
-  private def spark2CreateContext(): Unit = {
-    val sparkClz = Class.forName("org.apache.spark.sql.SparkSession$")
-    val sparkObj = sparkClz.getField("MODULE$").get(null)
-
-    val builderMethod = sparkClz.getMethod("builder")
-    val builder = builderMethod.invoke(sparkObj)
-    builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
-
-    if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
-      || conf.get("zeppelin.spark.useHiveContext", "false").toLowerCase == "true") {
-      val hiveSiteExisted: Boolean =
-        Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
-      val hiveClassesPresent =
-        sparkClz.getMethod("hiveClassesArePresent").invoke(sparkObj).asInstanceOf[Boolean]
-      if (hiveSiteExisted && hiveClassesPresent) {
-        builder.getClass.getMethod("enableHiveSupport").invoke(builder)
-        sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
-        LOGGER.info("Created Spark session (with Hive support)");
-      } else {
-        if (!hiveClassesPresent) {
-          LOGGER.warn("Hive support can not be enabled because spark is not built with hive")
-        }
-        if (!hiveSiteExisted) {
-          LOGGER.warn("Hive support can not be enabled because no hive-site.xml found")
-        }
-        sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
-        LOGGER.info("Created Spark session (without Hive support)");
-      }
-    } else {
-      sparkSession = builder.getClass.getMethod("getOrCreate").invoke(builder).asInstanceOf[SparkSession]
-      LOGGER.info("Created Spark session (without Hive support)");
-    }
-
-    sc = sparkSession.getClass.getMethod("sparkContext").invoke(sparkSession)
-      .asInstanceOf[SparkContext]
-    getUserFiles().foreach(file => sc.addFile(file))
-    sqlContext = sparkSession.getClass.getMethod("sqlContext").invoke(sparkSession)
-      .asInstanceOf[SQLContext]
-    sc.getClass.getMethod("uiWebUrl").invoke(sc).asInstanceOf[Option[String]] match {
-      case Some(url) => sparkUrl = url
-      case None =>
-    }
-
-    initAndSendSparkWebUrl()
-
-    bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient"""))
-    bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient"""))
-    bind("sqlContext", "org.apache.spark.sql.SQLContext", sqlContext, List("""@transient"""))
-  }
-
-  protected def initAndSendSparkWebUrl(): Unit = {
-    val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl");
-    if (!StringUtils.isBlank(webUiUrl)) {
-      this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId);
-    } else {
-      useYarnProxyURLIfNeeded()
-    }
-    InterpreterContext.get.getIntpEventClient.sendWebUrlInfo(this.sparkUrl)
-  }
-
-  protected def createZeppelinContext(): Unit = {
-
-    var sparkShims: SparkShims = null
-    if (isSparkSessionPresent()) {
-      sparkShims = SparkShims.getInstance(sc.version, properties, sparkSession)
-    } else {
-      sparkShims = SparkShims.getInstance(sc.version, properties, sc)
-    }
-
-    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
-    z = new SparkZeppelinContext(sc, sparkShims,
-      interpreterGroup.getInterpreterHookRegistry,
-      properties.getProperty("zeppelin.spark.maxResult", "1000").toInt)
-    bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
-  }
-
-  private def useYarnProxyURLIfNeeded() {
-    if (properties.getProperty("spark.webui.yarn.useProxy", "false").toBoolean) {
-      if (sparkMaster.startsWith("yarn")) {
-        val appId = sc.applicationId
-        val yarnClient = YarnClient.createYarnClient
-        val yarnConf = new YarnConfiguration()
-        // disable timeline service as we only query yarn app here.
-        // Otherwise we may hit this kind of ERROR:
-        // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
-        yarnConf.set("yarn.timeline-service.enabled", "false")
-        yarnClient.init(yarnConf)
-        yarnClient.start()
-        val appReport = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(appId))
-        this.sparkUrl = appReport.getTrackingUrl
-      }
-    }
-  }
-
-  private def isSparkSessionPresent(): Boolean = {
-    try {
-      Class.forName("org.apache.spark.sql.SparkSession")
-      true
-    } catch {
-      case _: ClassNotFoundException | _: NoClassDefFoundError => false
-    }
-  }
-
-  protected def getField(obj: Object, name: String): Object = {
-    val field = obj.getClass.getField(name)
-    field.setAccessible(true)
-    field.get(obj)
-  }
-
-  protected def getDeclareField(obj: Object, name: String): Object = {
-    val field = obj.getClass.getDeclaredField(name)
-    field.setAccessible(true)
-    field.get(obj)
-  }
-
-  protected def setDeclaredField(obj: Object, name: String, value: Object): Unit = {
-    val field = obj.getClass.getDeclaredField(name)
-    field.setAccessible(true)
-    field.set(obj, value)
-  }
-
-  protected def callMethod(obj: Object, name: String): Object = {
-    callMethod(obj, name, Array.empty[Class[_]], Array.empty[Object])
-  }
-
-  protected def callMethod(obj: Object, name: String,
-                           parameterTypes: Array[Class[_]],
-                           parameters: Array[Object]): Object = {
-    val method = obj.getClass.getMethod(name, parameterTypes: _ *)
-    method.setAccessible(true)
-    method.invoke(obj, parameters: _ *)
-  }
-
-  protected def getUserJars(): Seq[String] = {
-    var classLoader = Thread.currentThread().getContextClassLoader
-    var extraJars = Seq.empty[String]
-    while (classLoader != null) {
-      if (classLoader.getClass.getCanonicalName ==
-        "org.apache.spark.util.MutableURLClassLoader") {
-        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
-          // Check if the file exists.
-          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
-          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
-          .filterNot {
-            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
-          }
-          .map(url => url.toString).toSeq
-        classLoader = null
-      } else {
-        classLoader = classLoader.getParent
-      }
-    }
-
-    extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.getPath())
-    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
-    extraJars
-  }
-
-  protected def getUserFiles(): Seq[String] = {
-    depFiles.asScala.toSeq.filter(!_.endsWith(".jar"))
-  }
-}
-
-object BaseSparkScalaInterpreter {
-  val sessionNum = new AtomicInteger(0)
-}
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
deleted file mode 100644
index 79018c89a0..0000000000
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.spark
-
-import org.apache.spark.SparkContext
-import org.slf4j.{Logger, LoggerFactory}
-
-object JobProgressUtil {
-
-  protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
-
-  def progress(sc: SparkContext, jobGroup : String):Int = {
-    // Each paragraph has one unique jobGroup, and one paragraph may run multiple times.
-    // So only look for the first job which match the jobGroup
-    val jobInfo = sc.statusTracker
-      .getJobIdsForGroup(jobGroup)
-      .headOption
-      .flatMap(jobId => sc.statusTracker.getJobInfo(jobId))
-    val stagesInfoOption = jobInfo.flatMap( jobInfo => Some(jobInfo.stageIds().flatMap(sc.statusTracker.getStageInfo)))
-    stagesInfoOption match {
-      case None => 0
-      case Some(stagesInfo) =>
-        val taskCount = stagesInfo.map(_.numTasks).sum
-        val completedTaskCount = stagesInfo.map(_.numCompletedTasks).sum
-        LOGGER.debug("Total TaskCount: " + taskCount)
-        LOGGER.debug("Completed TaskCount: " + completedTaskCount)
-        if (taskCount == 0) {
-          0
-        } else {
-          (100 * completedTaskCount.toDouble / taskCount).toInt
-        }
-    }
-  }
-}
diff --git a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
index 709f484b20..4b4ffc8891 100644
--- a/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/java/org/apache/zeppelin/spark/SparkShims.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.spark;
 
 
-import org.apache.zeppelin.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.zeppelin.interpreter.InterpreterContext;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
index 88f0f88ee7..2b7e3a622a 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZSessionIntegrationTest.java
@@ -226,7 +226,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
       assertEquals(Status.ERROR, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
       assertEquals(0, result.getJobUrls().size());
 
     } finally {
@@ -299,7 +299,7 @@ public class ZSessionIntegrationTest extends AbstractTestRestApi {
       assertEquals(Status.ERROR, result.getStatus());
       assertEquals(1, result.getResults().size());
       assertEquals("TEXT", result.getResults().get(0).getType());
-      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view not found"));
+      assertTrue(result.getResults().get(0).getData(), result.getResults().get(0).getData().contains("Table or view 'unknown_table' not found in database"));
       assertEquals(0, result.getJobUrls().size());
 
       // cancel