You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/06/26 01:52:50 UTC

[zeppelin] branch master updated: ZEPPELIN-3552. Support Scala 2.12 of SparkInterpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new f61bddd  ZEPPELIN-3552. Support Scala 2.12 of SparkInterpreter
f61bddd is described below

commit f61bddd233587642ec7b792bb182a06d2f54dbf8
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Jun 12 19:20:59 2019 +0800

    ZEPPELIN-3552. Support Scala 2.12 of SparkInterpreter
    
    ### What is this PR for?
    
    This PR add support for scala 2.12 of SparkInterpreter. In this PR, I did some refactoring of whole spark modules. Each scala version interrpeter will be loaded dynamically via URLClassLoad, so that we can just write code once and compile it multiple times via different scala version and load it dynamically based on the current scala version.
    
    ### What type of PR is it?
    [Feature | Refactoring]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-3552
    
    ### How should this be tested?
    * CI passed, UT of scala 2.12 is added and passed
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3034 from zjffdu/ZEPPELIN-3552 and squashes the following commits:
    
    fe9c9eb12 [Jeff Zhang] [ZEPPELIN-3552]. Support Scala 2.12 of SparkInterpreter
---
 .travis.yml                                        |  34 ++--
 pom.xml                                            |   2 +-
 spark/interpreter/pom.xml                          |  42 ++---
 .../spark/AbstractSparkScalaInterpreter.java       |  71 +++++++++
 .../apache/zeppelin/spark/PySparkInterpreter.java  |   2 +-
 .../apache/zeppelin/spark/SparkInterpreter.java    | 119 ++++++++------
 .../apache/zeppelin/spark/SparkRInterpreter.java   |   8 +-
 .../main/java/org/apache/zeppelin/spark/Utils.java |  40 -----
 .../java/org/apache/zeppelin/spark/ZeppelinR.java  |   2 +-
 .../apache/zeppelin/spark/ZeppelinRContext.java    |   7 +-
 .../apache/zeppelin/spark/ZeppelinRDisplay.scala   |  12 +-
 .../src/test/resources/log4j.properties            |   2 +-
 spark/pom.xml                                      |  70 +++++----
 spark/scala-2.10/pom.xml                           |  27 +++-
 .../zeppelin/spark/SparkScala210Interpreter.scala  |  25 ++-
 spark/scala-2.11/pom.xml                           |  27 +++-
 .../zeppelin/spark/SparkScala211Interpreter.scala  |  29 +++-
 spark/{scala-2.10 => scala-2.12}/pom.xml           |  37 +++--
 spark/scala-2.12/spark-scala-parent                |   1 +
 .../src/main}/resources/log4j.properties           |   8 +-
 .../zeppelin/spark/SparkScala212Interpreter.scala} | 112 ++++---------
 spark/spark-dependencies/pom.xml                   |  46 ++++--
 spark/spark-scala-parent/pom.xml                   |  61 ++++++--
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala |  72 +++++----
 .../apache/zeppelin/spark/JobProgressUtil.scala    |   4 +-
 .../zeppelin/spark/SparkZeppelinContext.scala      |  11 +-
 .../org/apache/zeppelin/spark/SparkShims.java      |   1 -
 spark/spark2-shims/pom.xml                         |   2 +-
 zeppelin-display/pom.xml                           |   2 +-
 .../display/angular/AbstractAngularElem.scala      |   4 +-
 zeppelin-interpreter-integration/pom.xml           |  14 ++
 .../zeppelin/integration/FlinkIntegrationTest.java |   1 +
 .../zeppelin/integration/SparkIntegrationTest.java |  27 +++-
 .../integration/SparkIntegrationTest24.java        |   2 +-
 .../integration/ZeppelinSparkClusterTest.java      |   5 +-
 .../integration/ZeppelinSparkClusterTest24.java    |   2 +-
 .../src/test/resources/log4j.properties            |   1 +
 .../org/apache/zeppelin/util/ProcessLauncher.java  |   1 -
 zeppelin-plugins/launcher/spark/pom.xml            |  10 ++
 .../launcher/SparkInterpreterLauncher.java         | 173 ++++++++++++++++-----
 .../launcher/SparkInterpreterLauncherTest.java     | 100 +++++++++---
 .../launcher/StandardInterpreterLauncher.java      |   2 +-
 zeppelin-plugins/pom.xml                           |   8 +
 zeppelin-zengine/pom.xml                           |   5 +
 .../interpreter}/integration/DownloadUtils.java    |   4 +-
 45 files changed, 814 insertions(+), 421 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 0a02d08..76125d1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -83,14 +83,14 @@ matrix:
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
+      env: PYTHON="3" SPARKR="true" PROFILE="-Pspark-2.2 -Phelium-dev -Pexamples -Pspark-scala-2.11" BUILD_FLAG="install -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/JdbcIntegrationTest.java,**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
 
     # Test selenium with spark module for spark 2.3
     - jdk: "oraclejdk8"
       dist: trusty
       addons:
         firefox: "31.0"
-      env: BUILD_PLUGINS="true" CI="true" PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pscala-2.11" BUILD_FLAG="install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" CI="true" PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.3.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.3 -Phadoop2 -Phelium-dev -Pexamples -Pintegration -Pspark-scala-2.11" BUILD_FLAG="install -DskipTests -DskipRat -pl ${INTERPRETERS}" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-integration -DfailIfNoTests=false"
 
     # Test interpreter modules
     - jdk: "oraclejdk8"
@@ -99,41 +99,47 @@ matrix:
 
     # Run Spark integration test and unit test separately for each spark version
 
-    # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4
+    # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.11)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pscala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3
+    # ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.12)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pscala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2
+    # ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3 (Scala-2.11)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pscala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1
+    # ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2 (Scala-2.10)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0
+    # ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1 (Scala-2.10)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
-    # ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6
+    # ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0  (Scala-2.10)
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pscala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
+
+    # ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6  (Scala-2.10)
+    - sudo: required
+      jdk: "oraclejdk8"
+      dist: trusty
+      env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test python/pyspark with python 2, livy 0.5
     - sudo: required
diff --git a/pom.xml b/pom.xml
index 3138cd3..17774c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@
     <java.version>1.8</java.version>
     <scala.version>2.10.5</scala.version>
     <scala.binary.version>2.10</scala.binary.version>
-    <scalatest.version>2.2.4</scalatest.version>
+    <scalatest.version>3.0.7</scalatest.version>
     <scalacheck.version>1.12.5</scalacheck.version>
 
     <!-- frontend maven plugin related versions-->
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index c58324f..b58cc56 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -48,7 +48,7 @@
     <datanucleus.apijdo.version>3.2.6</datanucleus.apijdo.version>
     <datanucleus.core.version>3.2.10</datanucleus.core.version>
 
-    <scala.compile.version>${scala.version}</scala.compile.version>
+    <scala.compile.version>${spark.scala.version}</scala.compile.version>
     <!-- settings -->
     <pyspark.test.exclude>**/PySparkInterpreterMatplotlibTest.java</pyspark.test.exclude>
     <pyspark.test.include>**/*Test.*</pyspark.test.include>
@@ -59,18 +59,20 @@
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-display</artifactId>
       <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>spark-scala-2.10</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>spark-scala-2.11</artifactId>
-      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scalap</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -126,14 +128,14 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+      <artifactId>spark-repl_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <artifactId>spark-core_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
       <exclusions>
@@ -153,7 +155,7 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <artifactId>spark-hive_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
@@ -172,21 +174,21 @@
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
-      <version>${scala.version}</version>
+      <version>${spark.scala.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-compiler</artifactId>
-      <version>${scala.version}</version>
+      <version>${spark.scala.version}</version>
       <scope>provided</scope>
     </dependency>
 
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-reflect</artifactId>
-      <version>${scala.version}</version>
+      <version>${spark.scala.version}</version>
       <scope>provided</scope>
     </dependency>
 
@@ -212,7 +214,7 @@
     <!--test libraries-->
     <dependency>
       <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <artifactId>scalatest_${spark.scala.binary.version}</artifactId>
       <version>${scalatest.version}</version>
       <scope>test</scope>
     </dependency>
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
new file mode 100644
index 0000000..a4bac1f
--- /dev/null
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/AbstractSparkScalaInterpreter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SQLContext;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+
+import java.util.List;
+
+/**
+ * This is bridge class which bridge the communication between java side and scala side.
+ * Java side reply on this abstract class which is implemented by different scala versions.
+ */
+public abstract class AbstractSparkScalaInterpreter {
+
+  public abstract SparkContext getSparkContext();
+
+  public abstract SQLContext getSqlContext();
+
+  public abstract Object getSparkSession();
+
+  public abstract String getSparkUrl();
+
+  public abstract BaseZeppelinContext getZeppelinContext();
+
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return getProgress(Utils.buildJobGroupId(context), context);
+  }
+
+  public abstract int getProgress(String jobGroup,
+                                  InterpreterContext context) throws InterpreterException;
+
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    getSparkContext().cancelJobGroup(Utils.buildJobGroupId(context));
+  }
+
+  public Interpreter.FormType getFormType() throws InterpreterException {
+    return Interpreter.FormType.SIMPLE;
+  }
+
+  public abstract void open();
+
+  public abstract void close();
+
+  public abstract InterpreterResult interpret(String st, InterpreterContext context);
+
+  public abstract List<InterpreterCompletion> completion(String buf,
+                                                         int cursor,
+                                                         InterpreterContext interpreterContext);
+}
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index d1433e8..960227a 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -140,7 +140,7 @@ public class PySparkInterpreter extends PythonInterpreter {
     if (context.getLocalProperties().containsKey("pool")) {
       pool = "'" + context.getLocalProperties().get("pool") + "'";
     }
-    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+    String setPoolStmt = "if 'sc' in locals():\n\tsc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
     callPython(new PythonInterpretRequest(setPoolStmt, false, false));
   }
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 1b5b9f6..33769be 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -17,52 +17,50 @@
 
 package org.apache.zeppelin.spark;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterHookRegistry;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * SparkInterpreter of Java implementation. It is just wrapper of Spark211Interpreter
- * and Spark210Interpreter.
+ * SparkInterpreter of Java implementation. It delegates to different scala version AbstractSparkScalaInterpreter.
+ *
  */
 public class SparkInterpreter extends AbstractInterpreter {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreter.class);
 
-  private BaseSparkScalaInterpreter innerInterpreter;
+  private static AtomicInteger SESSION_NUM = new AtomicInteger(0);
+  private AbstractSparkScalaInterpreter innerInterpreter;
   private Map<String, String> innerInterpreterClassMap = new HashMap<>();
   private SparkContext sc;
   private JavaSparkContext jsc;
   private SQLContext sqlContext;
   private Object sparkSession;
 
-  private SparkZeppelinContext z;
   private SparkVersion sparkVersion;
   private boolean enableSupportedVersionCheck;
   private String sparkUrl;
-  private SparkShims sparkShims;
-
-  private static InterpreterHookRegistry hooks;
 
 
   public SparkInterpreter(Properties properties) {
@@ -75,13 +73,12 @@ public class SparkInterpreter extends AbstractInterpreter {
         properties.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
     innerInterpreterClassMap.put("2.10", "org.apache.zeppelin.spark.SparkScala210Interpreter");
     innerInterpreterClassMap.put("2.11", "org.apache.zeppelin.spark.SparkScala211Interpreter");
+    innerInterpreterClassMap.put("2.12", "org.apache.zeppelin.spark.SparkScala212Interpreter");
   }
 
   @Override
   public void open() throws InterpreterException {
     try {
-      String scalaVersion = extractScalaVersion();
-      LOGGER.info("Using Scala Version: " + scalaVersion);
       SparkConf conf = new SparkConf();
       for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
         if (!StringUtils.isBlank(entry.getValue().toString())) {
@@ -99,16 +96,10 @@ public class SparkInterpreter extends AbstractInterpreter {
       }
       // use local mode for embedded spark mode when spark.master is not found
       conf.setIfMissing("spark.master", "local");
-
-      String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
-      Class clazz = Class.forName(innerIntpClassName);
-      this.innerInterpreter = (BaseSparkScalaInterpreter)
-          clazz.getConstructor(SparkConf.class, List.class, Boolean.class)
-              .newInstance(conf, getDependencyFiles(),
-                  Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput", "true")));
+      this.innerInterpreter = loadSparkScalaInterpreter(conf);
       this.innerInterpreter.open();
 
-      sc = this.innerInterpreter.sc();
+      sc = this.innerInterpreter.getSparkContext();
       jsc = JavaSparkContext.fromSparkContext(sc);
       sparkVersion = SparkVersion.fromVersionString(sc.version());
       if (enableSupportedVersionCheck && sparkVersion.isUnsupportedVersion()) {
@@ -116,38 +107,72 @@ public class SparkInterpreter extends AbstractInterpreter {
             + "\nYou can set zeppelin.spark.enableSupportedVersionCheck to false if you really" +
             " want to try this version of spark.");
       }
-      sqlContext = this.innerInterpreter.sqlContext();
-      sparkSession = this.innerInterpreter.sparkSession();
-      hooks = getInterpreterGroup().getInterpreterHookRegistry();
-      sparkUrl = this.innerInterpreter.sparkUrl();
+      sqlContext = this.innerInterpreter.getSqlContext();
+      sparkSession = this.innerInterpreter.getSparkSession();
+      sparkUrl = this.innerInterpreter.getSparkUrl();
       String sparkUrlProp = getProperty("zeppelin.spark.uiWebUrl", "");
       if (!StringUtils.isBlank(sparkUrlProp)) {
         sparkUrl = sparkUrlProp;
       }
-      sparkShims = SparkShims.getInstance(sc.version(), getProperties());
-      sparkShims.setupSparkListener(sc.master(), sparkUrl, InterpreterContext.get());
 
-      z = new SparkZeppelinContext(sc, sparkShims, hooks,
-          Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
-      this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z,
-          Lists.newArrayList("@transient"));
+      SESSION_NUM.incrementAndGet();
     } catch (Exception e) {
       LOGGER.error("Fail to open SparkInterpreter", e);
       throw new InterpreterException("Fail to open SparkInterpreter", e);
     }
   }
 
+  /**
+   * Load AbstractSparkScalaInterpreter based on the runtime scala version.
+   * Load AbstractSparkScalaInterpreter from the following location:
+   *
+   * SparkScala210Interpreter   ZEPPELIN_HOME/interpreter/spark/scala-2.10
+   * SparkScala211Interpreter   ZEPPELIN_HOME/interpreter/spark/scala-2.11
+   * SparkScala212Interpreter   ZEPPELIN_HOME/interpreter/spark/scala-2.12
+   *
+   * @param conf
+   * @return AbstractSparkScalaInterpreter
+   * @throws Exception
+   */
+  private AbstractSparkScalaInterpreter loadSparkScalaInterpreter(SparkConf conf) throws Exception {
+    String scalaVersion = extractScalaVersion();
+    ClassLoader scalaInterpreterClassLoader = Thread.currentThread().getContextClassLoader();
+
+    String zeppelinHome = System.getenv("ZEPPELIN_HOME");
+    if (zeppelinHome != null) {
+      // ZEPPELIN_HOME is null in yarn-cluster mode, load it directly via current ClassLoader.
+      // otherwise, load from the specific folder ZEPPELIN_HOME/interpreter/spark/scala-<version>
+
+      File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion);
+      List<URL> urls = new ArrayList<>();
+      for (File file : scalaJarFolder.listFiles()) {
+        LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
+                + scalaJarFolder);
+        urls.add(file.toURI().toURL());
+      }
+      scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
+              Thread.currentThread().getContextClassLoader());
+    }
+
+    String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion);
+    Class clazz = scalaInterpreterClassLoader.loadClass(innerIntpClassName);
+    return (AbstractSparkScalaInterpreter)
+            clazz.getConstructor(SparkConf.class, List.class, Properties.class, InterpreterGroup.class, URLClassLoader.class)
+                    .newInstance(conf, getDependencyFiles(), getProperties(), getInterpreterGroup(), scalaInterpreterClassLoader);
+  }
+
   @Override
-  public void close() {
+  public void close() throws InterpreterException {
     LOGGER.info("Close SparkInterpreter");
-    if (innerInterpreter != null) {
+    if (SESSION_NUM.decrementAndGet() == 0 && innerInterpreter != null) {
       innerInterpreter.close();
       innerInterpreter = null;
     }
   }
 
   @Override
-  public InterpreterResult internalInterpret(String st, InterpreterContext context) {
+  public InterpreterResult internalInterpret(String st,
+                                             InterpreterContext context) throws InterpreterException {
     context.out.clear();
     sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
     // set spark.scheduler.pool to null to clear the pool assosiated with this paragraph
@@ -158,15 +183,14 @@ public class SparkInterpreter extends AbstractInterpreter {
   }
 
   @Override
-  public void cancel(InterpreterContext context) {
-    sc.cancelJobGroup(Utils.buildJobGroupId(context));
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    innerInterpreter.cancel(context);
   }
 
   @Override
   public List<InterpreterCompletion> completion(String buf,
                                                 int cursor,
-                                                InterpreterContext interpreterContext) {
-    LOGGER.debug("buf: " + buf + ", cursor:" + cursor);
+                                                InterpreterContext interpreterContext) throws InterpreterException {
     return innerInterpreter.completion(buf, cursor, interpreterContext);
   }
 
@@ -176,12 +200,12 @@ public class SparkInterpreter extends AbstractInterpreter {
   }
 
   @Override
-  public int getProgress(InterpreterContext context) {
+  public int getProgress(InterpreterContext context) throws InterpreterException {
     return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context);
   }
 
-  public SparkZeppelinContext getZeppelinContext() {
-    return this.z;
+  public BaseZeppelinContext getZeppelinContext() {
+    return this.innerInterpreter.getZeppelinContext();
   }
 
   public SparkContext getSparkContext() {
@@ -204,19 +228,20 @@ public class SparkInterpreter extends AbstractInterpreter {
     return sparkVersion;
   }
 
-  private String extractScalaVersion() throws IOException, InterruptedException {
+  private String extractScalaVersion() throws InterpreterException {
     String scalaVersionString = scala.util.Properties.versionString();
+    LOGGER.info("Using Scala: " + scalaVersionString);
     if (scalaVersionString.contains("version 2.10")) {
       return "2.10";
-    } else {
+    } else if (scalaVersionString.contains("version 2.11")) {
       return "2.11";
+    } else if (scalaVersionString.contains("version 2.12")) {
+      return "2.12";
+    } else {
+      throw new InterpreterException("Unsupported scala version: " + scalaVersionString);
     }
   }
 
-  public boolean isSparkContextInitialized() {
-    return this.sc != null;
-  }
-
   private List<String> getDependencyFiles() throws InterpreterException {
     List<String> depFiles = new ArrayList<>();
     // add jar from local repo
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index 3b14eed..4afb484 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -101,11 +101,12 @@ public class SparkRInterpreter extends Interpreter {
       ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
     }
     ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
-    ZeppelinRContext.setZeppelinContext((SparkZeppelinContext) sparkInterpreter.getZeppelinContext());
+    ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
 
     zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, SparkRBackend.port(), sparkVersion, timeout, this);
     try {
       zeppelinR.open();
+      logger.info("ZeppelinR is opened successfully.");
     } catch (IOException e) {
       throw new InterpreterException("Exception while opening SparkRInterpreter", e);
     }
@@ -167,7 +168,7 @@ public class SparkRInterpreter extends Interpreter {
 
         return new InterpreterResult(
             rDisplay.code(),
-            rDisplay.type(),
+            rDisplay.typ(),
             rDisplay.content()
         );
       } else {
@@ -183,8 +184,9 @@ public class SparkRInterpreter extends Interpreter {
   }
 
   @Override
-  public void close() {
+  public void close() throws InterpreterException {
     zeppelinR.close();
+    this.sparkInterpreter.close();
   }
 
   @Override
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
index 744e532..723a983 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/Utils.java
@@ -39,7 +39,6 @@ class Utils {
           "%html <font color=\"red\">Spark lower than 2.2 is deprecated, " +
           "if you don't want to see this message, please set " +
           "zeppelin.spark.deprecateMsg.show to false.</font>";
-  private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion();
 
   static Object invokeMethod(Object o, String name) {
     return invokeMethod(o, name, new Class[]{}, new Object[]{});
@@ -106,45 +105,6 @@ class Utils {
     }
   }
 
-  static boolean isScala2_11() {
-    return !isScala2_10();
-  }
-  
-  static boolean isCompilerAboveScala2_11_7() {
-    if (isScala2_10() || SCALA_COMPILER_VERSION == null) {
-      return false;
-    }
-    Pattern p = Pattern.compile("([0-9]+)[.]([0-9]+)[.]([0-9]+)");
-    Matcher m = p.matcher(SCALA_COMPILER_VERSION);
-    if (m.matches()) {
-      int major = Integer.parseInt(m.group(1));
-      int minor = Integer.parseInt(m.group(2));
-      int bugfix = Integer.parseInt(m.group(3));
-      return (major > 2 || (major == 2 && minor > 11) || (major == 2 && minor == 11 && bugfix > 7));
-    }
-    return false;
-  }
-
-  private static String evaluateScalaCompilerVersion() {
-    String version = null;
-    try {
-      Properties p = new Properties();
-      Class<?> completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion");
-      if (completionClass != null) {
-        try (java.io.InputStream in = completionClass.getClass()
-          .getResourceAsStream("/compiler.properties")) {
-          p.load(in);
-          version = p.getProperty("version.number");
-        } catch (java.io.IOException e) {
-          logger.error("Failed to evaluate Scala compiler version", e);
-        }
-      }
-    } catch (RuntimeException e) {
-      logger.error("Failed to evaluate Scala compiler version", e);
-    }
-    return version;
-  }
-
   static boolean isSpark2() {
     try {
       Class.forName("org.apache.spark.sql.SparkSession");
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
index 71f3568..60c5b17 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java
@@ -151,7 +151,7 @@ public class ZeppelinR implements ExecuteResultHandler {
       cmd.addArgument(SparkRBackend.socketSecret());
     }
     // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes
-    logger.debug(cmd.toString());
+    logger.debug("R Command: " + cmd.toString());
 
     executor = new DefaultExecutor();
     outputStream = new SparkRInterpreterOutputStream(logger, sparkRInterpreter);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
index 80ea03b..59a1e6f 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.spark;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
 
 /**
  * Contains the Spark and Zeppelin Contexts made available to SparkR.
@@ -27,7 +28,7 @@ import org.apache.spark.sql.SQLContext;
 public class ZeppelinRContext {
   private static SparkContext sparkContext;
   private static SQLContext sqlContext;
-  private static SparkZeppelinContext zeppelinContext;
+  private static BaseZeppelinContext zeppelinContext;
   private static Object sparkSession;
   private static JavaSparkContext javaSparkContext;
 
@@ -35,7 +36,7 @@ public class ZeppelinRContext {
     ZeppelinRContext.sparkContext = sparkContext;
   }
 
-  public static void setZeppelinContext(SparkZeppelinContext zeppelinContext) {
+  public static void setZeppelinContext(BaseZeppelinContext zeppelinContext) {
     ZeppelinRContext.zeppelinContext = zeppelinContext;
   }
 
@@ -55,7 +56,7 @@ public class ZeppelinRContext {
     return sqlContext;
   }
 
-  public static SparkZeppelinContext getZeppelinContext() {
+  public static BaseZeppelinContext getZeppelinContext() {
     return zeppelinContext;
   }
 
diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala
index a9014c2..9880691 100644
--- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala
+++ b/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala
@@ -29,7 +29,7 @@ import org.jsoup.safety.Whitelist
 import scala.collection.JavaConversions._
 import scala.util.matching.Regex
 
-case class RDisplay(content: String, `type`: Type, code: Code)
+class RDisplay(val content: String, val typ: Type, val code: Code)
 
 object ZeppelinRDisplay {
 
@@ -42,7 +42,7 @@ object ZeppelinRDisplay {
 
     val body = document.body()
 
-    if (body.getElementsByTag("p").isEmpty) return RDisplay(body.html(), HTML, SUCCESS)
+    if (body.getElementsByTag("p").isEmpty) return new RDisplay(body.html(), HTML, SUCCESS)
 
     val bodyHtml = body.html()
 
@@ -70,21 +70,21 @@ object ZeppelinRDisplay {
     // remove HTML tag while preserving whitespaces and newlines
     val text = Jsoup.clean(body.html(), "",
       Whitelist.none(), new OutputSettings().prettyPrint(false))
-    RDisplay(text, TEXT, SUCCESS)
+    new RDisplay(text, TEXT, SUCCESS)
   }
 
   private def tableDisplay(body: Element): RDisplay = {
     val p = body.getElementsByTag("p").first().html.replace("“%table " , "").replace("”", "")
     val r = (pattern findFirstIn p).getOrElse("")
     val table = p.replace(r, "").replace("\\t", "\t").replace("\\n", "\n")
-    RDisplay(table, TABLE, SUCCESS)
+    new RDisplay(table, TABLE, SUCCESS)
   }
 
   private def imgDisplay(body: Element): RDisplay = {
     val p = body.getElementsByTag("p").first().html.replace("“%img " , "").replace("”", "")
     val r = (pattern findFirstIn p).getOrElse("")
     val img = p.replace(r, "")
-    RDisplay(img, IMG, SUCCESS)
+    new RDisplay(img, IMG, SUCCESS)
   }
 
   private def htmlDisplay(body: Element, imageWidth: String): RDisplay = {
@@ -112,6 +112,6 @@ object ZeppelinRDisplay {
       image.attr("width", imageWidth)
     }
 
-    RDisplay(body.html, HTML, SUCCESS)
+    new RDisplay(body.html, HTML, SUCCESS)
   }
 }
diff --git a/spark/interpreter/src/test/resources/log4j.properties b/spark/interpreter/src/test/resources/log4j.properties
index edd13e4..38ba9e1 100644
--- a/spark/interpreter/src/test/resources/log4j.properties
+++ b/spark/interpreter/src/test/resources/log4j.properties
@@ -44,7 +44,7 @@ log4j.logger.DataNucleus.Datastore=ERROR
 log4j.logger.org.hibernate.type=ALL
 
 log4j.logger.org.apache.zeppelin.interpreter=WARN
-log4j.logger.org.apache.zeppelin.spark=INFO
+log4j.logger.org.apache.zeppelin.spark=DEBUG
 
 log4j.logger.org.apache.zeppelin.python=DEBUG
 log4j.logger.org.apache.spark.repl.Main=WARN
diff --git a/spark/pom.xml b/spark/pom.xml
index 91b9c2f..b83281d 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -44,8 +44,10 @@
         <!--plugin versions-->
         <plugin.scala.version>2.15.2</plugin.scala.version>
         <!-- spark versions -->
-        <spark.version>2.2.0</spark.version>
-        <py4j.version>0.10.4</py4j.version>
+        <spark.version>2.2.3</spark.version>
+        <spark.scala.version>2.11</spark.scala.version>
+        <spark.scala.binary.version>2.11.12</spark.scala.binary.version>
+        <py4j.version>0.10.7</py4j.version>
 
         <spark.archive>spark-${spark.version}</spark.archive>
         <spark.src.download.url>
@@ -61,6 +63,7 @@
         <module>spark-scala-parent</module>
         <module>scala-2.10</module>
         <module>scala-2.11</module>
+        <module>scala-2.12</module>
         <module>spark-dependencies</module>
         <module>spark-shims</module>
         <module>spark1-shims</module>
@@ -85,13 +88,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_${scala.binary.version}</artifactId>
-            <version>${scalatest.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
@@ -118,24 +114,6 @@
             </plugin>
 
             <plugin>
-                <groupId>org.scalatest</groupId>
-                <artifactId>scalatest-maven-plugin</artifactId>
-                <configuration>
-                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                    <junitxml>.</junitxml>
-                    <filereports>WDF TestSuite.txt</filereports>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test</id>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-
-            <plugin>
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
                 <version>3.2.2</version>
@@ -189,10 +167,40 @@
 
     <profiles>
 
+        <!-- profile spark-scala-x only affect the unit test in spark/interpreter module -->
+
+        <profile>
+            <id>spark-scala-2.12</id>
+            <properties>
+                <spark.scala.version>2.12.7</spark.scala.version>
+                <spark.scala.binary.version>2.12</spark.scala.binary.version>
+            </properties>
+        </profile>
+
+        <profile>
+            <id>spark-scala-2.11</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <spark.scala.version>2.11.8</spark.scala.version>
+                <spark.scala.binary.version>2.11</spark.scala.binary.version>
+            </properties>
+        </profile>
+
+        <profile>
+            <id>spark-scala-2.10</id>
+            <properties>
+                <spark.scala.version>2.10.5</spark.scala.version>
+                <spark.scala.binary.version>2.10</spark.scala.binary.version>
+            </properties>
+        </profile>
+
+        <!-- profile spark-x only affect the embedded spark version in zeppelin distribution -->
         <profile>
             <id>spark-2.4</id>
             <properties>
-                <spark.version>2.4.0</spark.version>
+                <spark.version>2.4.3</spark.version>
                 <protobuf.version>2.5.0</protobuf.version>
                 <py4j.version>0.10.7</py4j.version>
             </properties>
@@ -201,7 +209,7 @@
         <profile>
             <id>spark-2.3</id>
             <properties>
-                <spark.version>2.3.2</spark.version>
+                <spark.version>2.3.3</spark.version>
                 <protobuf.version>2.5.0</protobuf.version>
                 <py4j.version>0.10.7</py4j.version>
             </properties>
@@ -213,8 +221,8 @@
                 <activeByDefault>true</activeByDefault>
             </activation>
             <properties>
-                <spark.version>2.2.1</spark.version>
-                <py4j.version>0.10.4</py4j.version>
+                <spark.version>2.2.3</spark.version>
+                <py4j.version>0.10.7</py4j.version>
             </properties>
         </profile>
 
diff --git a/spark/scala-2.10/pom.xml b/spark/scala-2.10/pom.xml
index 2c8e8d9..f0e2f5b 100644
--- a/spark/scala-2.10/pom.xml
+++ b/spark/scala-2.10/pom.xml
@@ -33,10 +33,29 @@
   <name>Zeppelin: Spark Interpreter Scala_2.10</name>
 
   <properties>
-    <spark.version>2.2.0</spark.version>
-    <scala.version>2.10.5</scala.version>
-    <scala.binary.version>2.10</scala.binary.version>
-    <scala.compile.version>${scala.version}</scala.compile.version>
+    <spark.version>2.2.3</spark.version>
+    <spark.scala.version>2.10.5</spark.scala.version>
+    <spark.scala.binary.version>2.10</spark.scala.binary.version>
+    <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
   </properties>
 
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index 9d2ac83..eb0e297 100644
--- a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -18,14 +18,16 @@
 package org.apache.zeppelin.spark
 
 import java.io.File
+import java.net.URLClassLoader
 import java.nio.file.{Files, Paths}
+import java.util.Properties
 
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 import org.apache.spark.repl.SparkILoop._
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.tools.nsc.Settings
@@ -36,8 +38,10 @@ import scala.tools.nsc.interpreter._
   */
 class SparkScala210Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
-                               override val printReplOutput: java.lang.Boolean)
-  extends BaseSparkScalaInterpreter(conf, depFiles, printReplOutput) {
+                               override val properties: Properties,
+                               override val interpreterGroup: InterpreterGroup,
+                               override val sparkInterpreterClassLoader: URLClassLoader)
+  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
@@ -64,10 +68,10 @@ class SparkScala210Interpreter(override val conf: SparkConf,
     }
 
     val settings = new Settings()
-    settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
     settings.classpath.value = getUserJars.mkString(File.pathSeparator)
-    if (printReplOutput) {
+    if (properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean) {
       Console.setOut(interpreterOutput)
     }
     sparkILoop = new SparkILoop()
@@ -80,13 +84,18 @@ class SparkScala210Interpreter(override val conf: SparkConf,
       "org$apache$spark$repl$SparkILoop$$chooseReader",
       Array(settings.getClass), Array(settings)).asInstanceOf[InteractiveReader]
     setDeclaredField(sparkILoop, "org$apache$spark$repl$SparkILoop$$in", reader)
-    scalaCompleter = reader.completion.completer()
+    this.scalaCompletion = reader.completion
 
     createSparkContext()
+    createZeppelinContext()
   }
 
-  override def close(): Unit = {
-    super.close()
+  protected def completion(buf: String,
+                                    cursor: Int,
+                                    context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
   }
 
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =
diff --git a/spark/scala-2.11/pom.xml b/spark/scala-2.11/pom.xml
index fcee7c4..23b7461 100644
--- a/spark/scala-2.11/pom.xml
+++ b/spark/scala-2.11/pom.xml
@@ -33,10 +33,29 @@
   <name>Zeppelin: Spark Interpreter Scala_2.11</name>
 
   <properties>
-    <spark.version>2.4.0</spark.version>
-    <scala.version>2.11.12</scala.version>
-    <scala.binary.version>2.11</scala.binary.version>
-    <scala.compile.version>${scala.version}</scala.compile.version>
+    <spark.version>2.4.3</spark.version>
+    <spark.scala.version>2.11.12</spark.scala.version>
+    <spark.scala.binary.version>2.11</spark.scala.binary.version>
+    <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
   </properties>
 
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
index 8465145..7d99a0b 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
@@ -20,12 +20,13 @@ package org.apache.zeppelin.spark
 import java.io.{BufferedReader, File}
 import java.net.URLClassLoader
 import java.nio.file.{Files, Paths}
+import java.util.Properties
 
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup}
 import org.slf4j.LoggerFactory
 import org.slf4j.Logger
 
@@ -37,8 +38,10 @@ import scala.tools.nsc.interpreter._
   */
 class SparkScala211Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
-                               override val printReplOutput: java.lang.Boolean)
-  extends BaseSparkScalaInterpreter(conf, depFiles, printReplOutput) {
+                               override val properties: Properties,
+                               override val interpreterGroup: InterpreterGroup,
+                               override val sparkInterpreterClassLoader: URLClassLoader)
+  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   import SparkScala211Interpreter._
 
@@ -66,10 +69,11 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     val settings = new Settings()
     settings.processArguments(List("-Yrepl-class-based",
       "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
-    settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
     settings.classpath.value = getUserJars.mkString(File.pathSeparator)
 
+    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
     val replOut = if (printReplOutput) {
       new JPrintWriter(interpreterOutput, true)
     } else {
@@ -85,18 +89,29 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     sparkILoop.in = reader
     sparkILoop.initializeSynchronous()
     loopPostInit(this)
-    this.scalaCompleter = reader.completion.completer()
+    this.scalaCompletion = reader.completion
 
     createSparkContext()
+    createZeppelinContext()
+  }
+
+  protected override def completion(buf: String,
+                                    cursor: Int,
+                                    context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompletion.completer().complete(buf.substring(0, cursor), cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
   }
 
   protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
-      sparkILoop.bind(name, tpe, value, modifier)
+      val result = sparkILoop.bind(name, tpe, value, modifier)
+      if (result != IR.Success) {
+        throw new RuntimeException("Fail to bind variable: " + name)
+      }
     }
   }
 
-
   override def close(): Unit = {
     super.close()
     if (sparkILoop != null) {
diff --git a/spark/scala-2.10/pom.xml b/spark/scala-2.12/pom.xml
similarity index 53%
copy from spark/scala-2.10/pom.xml
copy to spark/scala-2.12/pom.xml
index 2c8e8d9..086203b 100644
--- a/spark/scala-2.10/pom.xml
+++ b/spark/scala-2.12/pom.xml
@@ -15,9 +15,9 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<project xmlns="https://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <groupId>org.apache.zeppelin</groupId>
     <artifactId>spark-scala-parent</artifactId>
@@ -27,16 +27,35 @@
 
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.zeppelin</groupId>
-  <artifactId>spark-scala-2.10</artifactId>
+  <artifactId>spark-scala-2.12</artifactId>
   <version>0.9.0-SNAPSHOT</version>
   <packaging>jar</packaging>
-  <name>Zeppelin: Spark Interpreter Scala_2.10</name>
+  <name>Zeppelin: Spark Interpreter Scala_2.12</name>
 
   <properties>
-    <spark.version>2.2.0</spark.version>
-    <scala.version>2.10.5</scala.version>
-    <scala.binary.version>2.10</scala.binary.version>
-    <scala.compile.version>${scala.version}</scala.compile.version>
+    <spark.version>2.4.3</spark.version>
+    <spark.scala.version>2.12.8</spark.scala.version>
+    <spark.scala.binary.version>2.12</spark.scala.binary.version>
+    <spark.scala.compile.version>${spark.scala.version}</spark.scala.compile.version>
   </properties>
 
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>net.alchim31.maven</groupId>
+        <artifactId>scala-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
 </project>
diff --git a/spark/scala-2.12/spark-scala-parent b/spark/scala-2.12/spark-scala-parent
new file mode 120000
index 0000000..e5e899e
--- /dev/null
+++ b/spark/scala-2.12/spark-scala-parent
@@ -0,0 +1 @@
+../spark-scala-parent
\ No newline at end of file
diff --git a/zeppelin-interpreter-integration/src/test/resources/log4j.properties b/spark/scala-2.12/src/main/resources/log4j.properties
similarity index 92%
copy from zeppelin-interpreter-integration/src/test/resources/log4j.properties
copy to spark/scala-2.12/src/main/resources/log4j.properties
index 50300f1..0c90b21 100644
--- a/zeppelin-interpreter-integration/src/test/resources/log4j.properties
+++ b/spark/scala-2.12/src/main/resources/log4j.properties
@@ -27,12 +27,13 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n
 
 # Root logger option
 log4j.rootLogger=INFO, stdout
-
+ 
 #mute some noisy guys
 log4j.logger.org.apache.hadoop.mapred=WARN
 log4j.logger.org.apache.hadoop.hive.ql=WARN
 log4j.logger.org.apache.hadoop.hive.metastore=WARN
 log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN
+log4j.logger.org.apache.zeppelin.scheduler=WARN
 
 log4j.logger.org.quartz=WARN
 log4j.logger.DataNucleus=WARN
@@ -41,6 +42,9 @@ log4j.logger.DataNucleus.Datastore=ERROR
 
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
-log4j.logger.org.apache.hadoop=WARN
 
 log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.spark=DEBUG
+
+
+log4j.logger.org.apache.spark.repl.Main=INFO
diff --git a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
similarity index 50%
copy from spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
copy to spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
index 8465145..a0fe7f1 100644
--- a/spark/scala-2.11/src/main/scala/org/apache/zeppelin/spark/SparkScala211Interpreter.scala
+++ b/spark/scala-2.12/src/main/scala/org/apache/zeppelin/spark/SparkScala212Interpreter.scala
@@ -20,12 +20,13 @@ package org.apache.zeppelin.spark
 import java.io.{BufferedReader, File}
 import java.net.URLClassLoader
 import java.nio.file.{Files, Paths}
+import java.util.Properties
 
 import org.apache.spark.SparkConf
 import org.apache.spark.repl.SparkILoop
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterGroup}
 import org.slf4j.LoggerFactory
 import org.slf4j.Logger
 
@@ -33,14 +34,14 @@ import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter._
 
 /**
-  * SparkInterpreter for scala-2.11
+  * SparkInterpreter for scala-2.12
   */
-class SparkScala211Interpreter(override val conf: SparkConf,
+class SparkScala212Interpreter(override val conf: SparkConf,
                                override val depFiles: java.util.List[String],
-                               override val printReplOutput: java.lang.Boolean)
-  extends BaseSparkScalaInterpreter(conf, depFiles, printReplOutput) {
-
-  import SparkScala211Interpreter._
+                               override val properties: Properties,
+                               override val interpreterGroup: InterpreterGroup,
+                               override val sparkInterpreterClassLoader: URLClassLoader)
+  extends BaseSparkScalaInterpreter(conf, depFiles, properties, interpreterGroup, sparkInterpreterClassLoader) {
 
   lazy override val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
@@ -58,18 +59,15 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     val outputDir = Files.createTempDirectory(Paths.get(rootDir), "spark").toFile
     outputDir.deleteOnExit()
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
-    startHttpServer(outputDir).foreach { case (server, uri) =>
-      sparkHttpServer = server
-      conf.set("spark.repl.class.uri", uri)
-    }
 
     val settings = new Settings()
     settings.processArguments(List("-Yrepl-class-based",
       "-Yrepl-outdir", s"${outputDir.getAbsolutePath}"), true)
-    settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
+    settings.embeddedDefaults(sparkInterpreterClassLoader)
     settings.usejavacp.value = true
     settings.classpath.value = getUserJars.mkString(File.pathSeparator)
 
+    val printReplOutput = properties.getProperty("zeppelin.spark.printREPLOutput", "true").toBoolean
     val replOut = if (printReplOutput) {
       new JPrintWriter(interpreterOutput, true)
     } else {
@@ -78,21 +76,32 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     sparkILoop = new SparkILoop(None, replOut)
     sparkILoop.settings = settings
     sparkILoop.createInterpreter()
-
-    val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
+    val in0 = getDeclareField(sparkILoop, "in0").asInstanceOf[Option[BufferedReader]]
     val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r, replOut, interactive = true))
 
     sparkILoop.in = reader
     sparkILoop.initializeSynchronous()
-    loopPostInit(this)
-    this.scalaCompleter = reader.completion.completer()
+    sparkILoop.in.postInit()
+    this.scalaCompletion = reader.completion
 
     createSparkContext()
+    createZeppelinContext()
+  }
+
+  protected override def completion(buf: String,
+                           cursor: Int,
+                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
+    val completions = scalaCompletion.complete(buf.substring(0, cursor), cursor).candidates
+      .map(e => new InterpreterCompletion(e, e, null))
+    scala.collection.JavaConversions.seqAsJavaList(completions)
   }
 
   protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit = {
     sparkILoop.beQuietDuring {
-      sparkILoop.bind(name, tpe, value, modifier)
+      val result = sparkILoop.bind(name, tpe, value, modifier)
+      if (result != IR.Success) {
+        throw new RuntimeException("Fail to bind variable: " + name)
+      }
     }
   }
 
@@ -108,72 +117,3 @@ class SparkScala211Interpreter(override val conf: SparkConf,
     sparkILoop.interpret(code)
 
 }
-
-private object SparkScala211Interpreter {
-
-  /**
-    * This is a hack to call `loopPostInit` at `ILoop`. At higher version of Scala such
-    * as 2.11.12, `loopPostInit` became a nested function which is inaccessible. Here,
-    * we redefine `loopPostInit` at Scala's 2.11.8 side and ignore `loadInitFiles` being called at
-    * Scala 2.11.12 since here we do not have to load files.
-    *
-    * Both methods `loopPostInit` and `unleashAndSetPhase` are redefined, and `phaseCommand` and
-    * `asyncMessage` are being called via reflection since both exist in Scala 2.11.8 and 2.11.12.
-    *
-    * Please see the codes below:
-    * https://github.com/scala/scala/blob/v2.11.8/src/repl/scala/tools/nsc/interpreter/ILoop.scala
-    * https://github.com/scala/scala/blob/v2.11.12/src/repl/scala/tools/nsc/interpreter/ILoop.scala
-    *
-    * See also ZEPPELIN-3810.
-    */
-  private def loopPostInit(interpreter: SparkScala211Interpreter): Unit = {
-    import StdReplTags._
-    import scala.reflect.classTag
-    import scala.reflect.io
-
-    val sparkILoop = interpreter.sparkILoop
-    val intp = sparkILoop.intp
-    val power = sparkILoop.power
-    val in = sparkILoop.in
-
-    def loopPostInit() {
-      // Bind intp somewhere out of the regular namespace where
-      // we can get at it in generated code.
-      intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain]))
-      // Auto-run code via some setting.
-      (replProps.replAutorunCode.option
-        flatMap (f => io.File(f).safeSlurp())
-        foreach (intp quietRun _)
-        )
-      // classloader and power mode setup
-      intp.setContextClassLoader()
-      if (isReplPower) {
-        replProps.power setValue true
-        unleashAndSetPhase()
-        asyncMessage(power.banner)
-      }
-      // SI-7418 Now, and only now, can we enable TAB completion.
-      in.postInit()
-    }
-
-    def unleashAndSetPhase() = if (isReplPower) {
-      power.unleash()
-      intp beSilentDuring phaseCommand("typer") // Set the phase to "typer"
-    }
-
-    def phaseCommand(name: String): Results.Result = {
-      interpreter.callMethod(
-        sparkILoop,
-        "scala$tools$nsc$interpreter$ILoop$$phaseCommand",
-        Array(classOf[String]),
-        Array(name)).asInstanceOf[Results.Result]
-    }
-
-    def asyncMessage(msg: String): Unit = {
-      interpreter.callMethod(
-        sparkILoop, "asyncMessage", Array(classOf[String]), Array(msg))
-    }
-
-    loopPostInit()
-  }
-}
diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml
index b9d8e19..dcd8ff6 100644
--- a/spark/spark-dependencies/pom.xml
+++ b/spark/spark-dependencies/pom.xml
@@ -56,10 +56,39 @@
   </properties>
 
   <dependencies>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.10</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.11</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.12</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- Spark -->
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <artifactId>spark-core_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
       <exclusions>
         <exclusion>
@@ -71,31 +100,31 @@
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+      <artifactId>spark-repl_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <artifactId>spark-sql_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-hive_${scala.binary.version}</artifactId>
+      <artifactId>spark-hive_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <artifactId>spark-streaming_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <artifactId>spark-catalyst_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
@@ -105,11 +134,10 @@
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
-    
-    <!-- yarn (not supported for Spark v1.5.0 or higher) -->
+
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-yarn_${scala.binary.version}</artifactId>
+      <artifactId>spark-yarn_${spark.scala.binary.version}</artifactId>
       <version>${spark.version}</version>
     </dependency>
 
diff --git a/spark/spark-scala-parent/pom.xml b/spark/spark-scala-parent/pom.xml
index 50bedc7..b19afb6 100644
--- a/spark/spark-scala-parent/pom.xml
+++ b/spark/spark-scala-parent/pom.xml
@@ -36,15 +36,15 @@
 
     <properties>
         <spark.version>2.4.0</spark.version>
-        <scala.binary.version>2.11</scala.binary.version>
-        <scala.version>2.11.12</scala.version>
-        <scala.compile.version>${scala.binary.version}</scala.compile.version>
+        <spark.scala.binary.version>2.11</spark.scala.binary.version>
+        <spark.scala.version>2.11.12</spark.scala.version>
+        <saprk.scala.compile.version>${spark.scala.binary.version}</saprk.scala.compile.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.zeppelin</groupId>
-            <artifactId>zeppelin-interpreter</artifactId>
+            <artifactId>spark-interpreter</artifactId>
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -57,21 +57,34 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-repl_${scala.binary.version}</artifactId>
+            <artifactId>spark-repl_${spark.scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <artifactId>spark-core_${spark.scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.thoughtworks.paranamer</groupId>
+                    <artifactId>paranamer</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.thoughtworks.paranamer</groupId>
+            <artifactId>paranamer</artifactId>
+            <version>2.8</version>
+            <scope>runtime</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <artifactId>spark-hive_${spark.scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -79,21 +92,21 @@
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}</version>
+            <version>${spark.scala.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
-            <version>${scala.version}</version>
+            <version>${spark.scala.version}</version>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-reflect</artifactId>
-            <version>${scala.version}</version>
+            <version>${spark.scala.version}</version>
             <scope>provided</scope>
         </dependency>
 
@@ -113,8 +126,24 @@
         </dependency>
     </dependencies>
 
+
     <build>
+        <pluginManagement>
+
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>${plugin.clean.version}</version>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>${project.basedir}/../../interpreter/spark/scala-${spark.scala.binary.version}</directory>
+                            <followSymlinks>false</followSymlinks>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
 
             <plugin>
                 <artifactId>maven-resources-plugin</artifactId>
@@ -215,7 +244,7 @@
                     </execution>
                 </executions>
                 <configuration>
-                    <scalaVersion>${scala.compile.version}</scalaVersion>
+                    <scalaVersion>${spark.scala.version}</scalaVersion>
                     <args>
                         <arg>-unchecked</arg>
                         <arg>-deprecation</arg>
@@ -237,7 +266,17 @@
                 </configuration>
             </plugin>
 
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.3.1</version>
+                <configuration>
+                    <outputDirectory>${project.basedir}/../../interpreter/spark/scala-${spark.scala.binary.version}</outputDirectory>
+                </configuration>
+            </plugin>
+
         </plugins>
+        </pluginManagement>
     </build>
 
 </project>
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 3a2cd0b..421d85a 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -24,14 +24,13 @@ import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext}
-import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
-import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterResult}
+import org.apache.zeppelin.interpreter.{BaseZeppelinContext, InterpreterContext, InterpreterGroup, InterpreterResult}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
-import scala.tools.nsc.interpreter.Completion.ScalaCompleter
+import scala.tools.nsc.interpreter.Completion
 import scala.util.control.NonFatal
 
 /**
@@ -40,10 +39,15 @@ import scala.util.control.NonFatal
   *
   * @param conf
   * @param depFiles
+  * @param properties
+  * @param interpreterGroup
   */
 abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
                                          val depFiles: java.util.List[String],
-                                         val printReplOutput: java.lang.Boolean) {
+                                         val properties: java.util.Properties,
+                                         val interpreterGroup: InterpreterGroup,
+                                         val sparkInterpreterClassLoader: URLClassLoader)
+  extends AbstractSparkScalaInterpreter() {
 
   protected lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass)
 
@@ -59,7 +63,9 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected var sparkUrl: String = _
 
-  protected var scalaCompleter: ScalaCompleter = _
+  protected var scalaCompletion: Completion = _
+
+  protected var z: SparkZeppelinContext = _
 
   protected val interpreterOutput: InterpreterOutputStream
 
@@ -139,18 +145,20 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result
 
-  protected def completion(buf: String,
-                           cursor: Int,
-                           context: InterpreterContext): java.util.List[InterpreterCompletion] = {
-    val completions = scalaCompleter.complete(buf.substring(0, cursor), cursor).candidates
-      .map(e => new InterpreterCompletion(e, e, null))
-    scala.collection.JavaConversions.seqAsJavaList(completions)
-  }
-
   protected def getProgress(jobGroup: String, context: InterpreterContext): Int = {
     JobProgressUtil.progress(sc, jobGroup)
   }
 
+  override def getSparkContext: SparkContext = sc
+
+  override def getSqlContext: SQLContext = sqlContext
+
+  override def getSparkSession: AnyRef = sparkSession
+
+  override def getSparkUrl: String = sparkUrl
+
+  override def getZeppelinContext: BaseZeppelinContext = z
+
   protected def bind(name: String, tpe: String, value: Object, modifier: List[String]): Unit
 
   // for use in java side
@@ -161,20 +169,18 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     bind(name, tpe, value, modifier.asScala.toList)
 
   protected def close(): Unit = {
-    if (BaseSparkScalaInterpreter.sessionNum.decrementAndGet() == 0) {
-      if (sc != null) {
-        sc.stop()
-      }
-      if (sparkHttpServer != null) {
-        sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
-      }
-      sc = null
-      sqlContext = null
-      if (sparkSession != null) {
-        sparkSession.getClass.getMethod("stop").invoke(sparkSession)
-        sparkSession = null
-      }
+    if (sparkHttpServer != null) {
+      sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    sc = null
+    if (sparkSession != null) {
+      sparkSession.getClass.getMethod("stop").invoke(sparkSession)
+      sparkSession = null
     }
+    sqlContext = null
   }
 
   protected def createSparkContext(): Unit = {
@@ -295,6 +301,16 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     interpret("print(\"\")")
   }
 
+  protected def createZeppelinContext(): Unit = {
+    val sparkShims = SparkShims.getInstance(sc.version, properties)
+    sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get)
+
+    z = new SparkZeppelinContext(sc, sparkShims,
+      interpreterGroup.getInterpreterHookRegistry,
+      properties.getProperty("zeppelin.spark.maxResult").toInt)
+    bind("z", z.getClass.getCanonicalName, z, List("""@transient"""))
+  }
+
   private def isSparkSessionPresent(): Boolean = {
     try {
       Class.forName("org.apache.spark.sql.SparkSession")
@@ -392,6 +408,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
         classLoader = classLoader.getParent
       }
     }
+
+    extraJars ++= sparkInterpreterClassLoader.getURLs().map(_.toString)
     LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
     extraJars
   }
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
index 517bed0..3b44c9a 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/JobProgressUtil.scala
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark
+package org.apache.zeppelin.spark
+
+import org.apache.spark.SparkContext
 
 object JobProgressUtil {
 
diff --git a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
similarity index 93%
rename from spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
rename to spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
index e80c152..83594d0 100644
--- a/spark/interpreter/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/SparkZeppelinContext.scala
@@ -37,12 +37,11 @@ class SparkZeppelinContext(val sc: SparkContext,
                            val maxResult2: Int) extends BaseZeppelinContext(hooks2, maxResult2) {
 
   private val interpreterClassMap = Map(
-    "spark" -> "org.apache.zeppelin.spark.SparkInterpreter",
-    "sql" -> "org.apache.zeppelin.spark.SparkSqlInterpreter",
-    "dep" -> "org.apache.zeppelin.spark.DepInterpreter",
-    "pyspark" -> "org.apache.zeppelin.spark.PySparkInterpreter",
-    "ipyspark" -> "org.apache.zeppelin.spark.IPySparkInterpreter",
-    "r" -> "org.apache.zeppelin.spark.SparkRInterpreter"
+    ("spark", "org.apache.zeppelin.spark.SparkInterpreter"),
+    ("sql", "org.apache.zeppelin.spark.SparkSqlInterpreter"),
+    ("pyspark", "org.apache.zeppelin.spark.PySparkInterpreter"),
+    ("ipyspark", "org.apache.zeppelin.spark.IPySparkInterpreter"),
+    ("r", "org.apache.zeppelin.spark.SparkRInterpreter")
   )
 
   private val supportedClasses = scala.collection.mutable.ArrayBuffer[Class[_]]()
diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
index efd65fc..10fb9d6 100644
--- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.ResultMessages;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml
index 2c2fe49..79c6a89 100644
--- a/spark/spark2-shims/pom.xml
+++ b/spark/spark2-shims/pom.xml
@@ -35,7 +35,7 @@
 
   <properties>
     <scala.binary.version>2.11</scala.binary.version>
-    <spark.version>2.1.2</spark.version>
+    <spark.version>2.3.2</spark.version>
   </properties>
 
   <dependencies>
diff --git a/zeppelin-display/pom.xml b/zeppelin-display/pom.xml
index ae69cb0..6c227c2 100644
--- a/zeppelin-display/pom.xml
+++ b/zeppelin-display/pom.xml
@@ -103,7 +103,7 @@
         <dependency>
           <groupId>org.scala-lang.modules</groupId>
           <artifactId>scala-xml_${scala.binary.version}</artifactId>
-          <version>1.0.2</version>
+          <version>1.1.0</version>
           <scope>provided</scope>
         </dependency>
       </dependencies>
diff --git a/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala b/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala
index 7049e7a..66961fd 100644
--- a/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala
+++ b/zeppelin-display/src/main/scala/org/apache/zeppelin/display/angular/AbstractAngularElem.scala
@@ -84,7 +84,7 @@ abstract class AbstractAngularElem(val interpreterContext: InterpreterContext,
     newElem(
       interpreterContext,
       name,
-      angularObjects + (name -> angularObject),
+      angularObjects + ((name, angularObject)),
       elem)
   }
 
@@ -147,7 +147,7 @@ abstract class AbstractAngularElem(val interpreterContext: InterpreterContext,
     newElem(
       interpreterContext,
       modelName,
-      angularObjects + (eventName -> angularObject),
+      angularObjects + ((eventName, angularObject)),
       elem)
   }
 
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 851823d..18f2eb2 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -59,6 +59,20 @@
         </exclusion>
       </exclusions>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-zengine</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index b7a7da2..ca393f2 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index d5d07a0..bebca31 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -89,6 +90,7 @@ public abstract class SparkIntegrationTest {
     // add jars & packages for testing
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
+    sparkInterpreterSetting.setProperty("SPARK_PRINT_LAUNCH_COMMAND", "true");
     MavenXpp3Reader reader = new MavenXpp3Reader();
     Model model = reader.read(new FileReader("pom.xml"));
     sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath());
@@ -134,7 +136,7 @@ public abstract class SparkIntegrationTest {
     } else {
       interpreterResult = sparkrInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", context);
     }
-    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
     assertTrue(interpreterResult.message().get(0).getData().contains("eruptions waiting"));
   }
@@ -182,6 +184,27 @@ public abstract class SparkIntegrationTest {
     assertEquals(1, response.getApplicationList().size());
 
     interpreterSettingManager.close();
+
+    waitForYarnAppCompleted(30 * 1000);
+  }
+
+  private void waitForYarnAppCompleted(int timeout) throws YarnException {
+    long start = System.currentTimeMillis();
+    boolean yarnAppCompleted = false;
+    while ((System.currentTimeMillis() - start) < timeout ) {
+      GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+      GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+      if (response.getApplicationList().isEmpty()) {
+        yarnAppCompleted = true;
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    assertTrue("Yarn app is not completed in " + timeout + " milliseconds.", yarnAppCompleted);
   }
 
   @Test
@@ -206,6 +229,8 @@ public abstract class SparkIntegrationTest {
     assertEquals(1, response.getApplicationList().size());
 
     interpreterSettingManager.close();
+
+    waitForYarnAppCompleted(30 * 1000);
   }
 
   private boolean isSpark2() {
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
index 0441cac..96b484a 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest24.java
@@ -33,7 +33,7 @@ public class SparkIntegrationTest24 extends SparkIntegrationTest{
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.0"}
+            {"2.4.3"}
     });
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index 321b94f..9c301b1 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.zeppelin.integration;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
@@ -25,6 +24,7 @@ import org.apache.zeppelin.interpreter.InterpreterNotFoundException;
 import org.apache.zeppelin.interpreter.InterpreterProperty;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Notebook;
@@ -36,8 +36,6 @@ import org.apache.zeppelin.utils.TestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +43,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.StringReader;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
index b7fadd4..4431f94 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest24.java
@@ -33,7 +33,7 @@ public class ZeppelinSparkClusterTest24 extends ZeppelinSparkClusterTest {
   @Parameterized.Parameters
   public static List<Object[]> data() {
     return Arrays.asList(new Object[][]{
-            {"2.4.0"}
+            {"2.4.3"}
     });
   }
 }
diff --git a/zeppelin-interpreter-integration/src/test/resources/log4j.properties b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
index 50300f1..95c474c 100644
--- a/zeppelin-interpreter-integration/src/test/resources/log4j.properties
+++ b/zeppelin-interpreter-integration/src/test/resources/log4j.properties
@@ -44,3 +44,4 @@ log4j.logger.org.hibernate.type=ALL
 log4j.logger.org.apache.hadoop=WARN
 
 log4j.logger.org.apache.zeppelin.interpreter=DEBUG
+log4j.logger.org.apache.zeppelin.util=DEBUG
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java
index 05218ce..bdea797 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/util/ProcessLauncher.java
@@ -93,7 +93,6 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
   }
 
   public void onProcessRunning() {
-    LOGGER.info("Process is running");
     transition(State.RUNNING);
   }
 
diff --git a/zeppelin-plugins/launcher/spark/pom.xml b/zeppelin-plugins/launcher/spark/pom.xml
index 9ea80d2..5a9d41b 100644
--- a/zeppelin-plugins/launcher/spark/pom.xml
+++ b/zeppelin-plugins/launcher/spark/pom.xml
@@ -53,6 +53,16 @@
             <plugin>
                 <artifactId>maven-dependency-plugin</artifactId>
             </plugin>
+
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${plugin.surefire.version}</version>
+                <configuration>
+                    <environmentVariables>
+                        <ZEPPELIN_HOME>${project.basedir}/../../..</ZEPPELIN_HOME>
+                    </environmentVariables>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 1bf446f..ea883a2 100644
--- a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -17,11 +17,23 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -46,7 +58,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
   }
 
   @Override
-  protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
+  protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException {
     Map<String, String> env = super.buildEnvFromProperties(context);
     Properties sparkProperties = new Properties();
     String sparkMaster = getSparkMaster(properties);
@@ -77,34 +89,53 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
       } else {
         sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties");
       }
+      sparkProperties.put("spark.yarn.maxAppAttempts", "1");
+    }
+
+
+    if (isYarnMode()
+        && getDeployMode().equals("cluster")) {
+      try {
+        List<String> additionalJars = new ArrayList();
+        Path localRepoPath =
+                Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
+        if (Files.exists(localRepoPath) && Files.isDirectory(localRepoPath)) {
+          List<String> localRepoJars = StreamSupport.stream(
+                  Files.newDirectoryStream(localRepoPath, entry -> Files.isRegularFile(entry))
+                          .spliterator(),
+                  false)
+                  .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
+          additionalJars.addAll(localRepoJars);
+        }
+
+        String scalaVersion = detectSparkScalaVersion(properties.getProperty("SPARK_HOME"));
+        Path scalaFolder =  Paths.get(zConf.getZeppelinHome(), "/interpreter/spark/scala-" + scalaVersion);
+        List<String> scalaJars = StreamSupport.stream(
+                Files.newDirectoryStream(scalaFolder, entry -> Files.isRegularFile(entry))
+                        .spliterator(),
+                false)
+                .map(jar -> jar.toAbsolutePath().toString()).collect(Collectors.toList());
+        additionalJars.addAll(scalaJars);
+
+        if (sparkProperties.containsKey("spark.jars")) {
+          sparkProperties.put("spark.jars", sparkProperties.getProperty("spark.jars") + "," +
+                  StringUtils.join(additionalJars, ","));
+        } else {
+          sparkProperties.put("spark.jars", StringUtils.join(additionalJars, ","));
+        }
+      } catch (Exception e) {
+        throw new IOException("Cannot make a list of additional jars from localRepo: {}", e);
+      }
     }
+
     for (String name : sparkProperties.stringPropertyNames()) {
       sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
     }
     String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
     if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
-        !useProxyUserEnv.equals("false"))) {
+            !useProxyUserEnv.equals("false"))) {
       sparkConfBuilder.append(" --proxy-user " + context.getUserName());
     }
-    Path localRepoPath =
-        Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId());
-    if (isYarnMode()
-        && getDeployMode().equals("cluster")
-        && Files.exists(localRepoPath)
-        && Files.isDirectory(localRepoPath)) {
-      try {
-        StreamSupport.stream(
-                Files.newDirectoryStream(localRepoPath, entry -> Files.isRegularFile(entry))
-                    .spliterator(),
-                false)
-            .map(jar -> jar.toAbsolutePath().toString())
-            .reduce((x, y) -> x.concat(",").concat(y))
-            .ifPresent(extraJars -> sparkConfBuilder.append(" --jars ").append(extraJars));
-      } catch (IOException e) {
-        LOGGER.error("Cannot make a list of additional jars from localRepo: {}", localRepoPath, e);
-      }
-
-    }
 
     env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
 
@@ -137,6 +168,66 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
 
   }
 
+  private String detectSparkScalaVersion(String sparkHome) throws Exception {
+    ProcessBuilder builder = new ProcessBuilder(sparkHome + "/bin/spark-submit", "--version");
+    File processOutputFile = File.createTempFile("zeppelin-spark", ".out");
+    builder.redirectError(processOutputFile);
+    Process process = builder.start();
+    process.waitFor();
+    String processOutput = IOUtils.toString(new FileInputStream(processOutputFile));
+    Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*");
+    Matcher matcher = pattern.matcher(processOutput);
+    if (matcher.find()) {
+      String scalaVersion = matcher.group(1);
+      if (scalaVersion.startsWith("2.10")) {
+        return "2.10";
+      } else if (scalaVersion.startsWith("2.11")) {
+        return "2.11";
+      } else if (scalaVersion.startsWith("2.12")) {
+        return "2.12";
+      } else {
+        throw new Exception("Unsupported scala version: " + scalaVersion);
+      }
+    } else {
+      return detectSparkScalaVersionByReplClass(sparkHome);
+    }
+  }
+
+  private String detectSparkScalaVersionByReplClass(String sparkHome) throws Exception {
+    File sparkLibFolder = new File(sparkHome + "/lib");
+    if (sparkLibFolder.exists()) {
+      // spark 1.6 if spark/lib exists
+      File[] sparkAssemblyJars = new File(sparkHome + "/lib").listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          return name.contains("spark-assembly");
+        }
+      });
+      if (sparkAssemblyJars.length == 0) {
+        throw new Exception("No spark assembly file found in SPARK_HOME: " + sparkHome);
+      }
+      if (sparkAssemblyJars.length > 1) {
+        throw new Exception("Multiple spark assembly file found in SPARK_HOME: " + sparkHome);
+      }
+      URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{sparkAssemblyJars[0].toURI().toURL()});
+      try {
+        urlClassLoader.loadClass("org.apache.spark.repl.SparkCommandLine");
+        return "2.10";
+      } catch (ClassNotFoundException e) {
+        return "2.11";
+      }
+    } else {
+      // spark 2.x if spark/lib doesn't exists
+      File sparkJarsFolder = new File(sparkHome + "/jars");
+      boolean sparkRepl211Exists =
+              Stream.of(sparkJarsFolder.listFiles()).anyMatch(file -> file.getName().contains("spark-repl_2.11"));
+      if (sparkRepl211Exists) {
+        return "2.11";
+      } else {
+        return "2.10";
+      }
+    }
+  }
 
   /**
    * get environmental variable in the following order
@@ -174,27 +265,29 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
   }
 
   private void setupPropertiesForSparkR(Properties sparkProperties) {
-    String sparkHome = getEnv("SPARK_HOME");
-    File sparkRBasePath = null;
-    if (sparkHome == null) {
-      if (!getSparkMaster(properties).startsWith("local")) {
-        throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
-            " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
-            " interpreter setting");
+    if (isYarnMode()) {
+      String sparkHome = getEnv("SPARK_HOME");
+      File sparkRBasePath = null;
+      if (sparkHome == null) {
+        if (!getSparkMaster(properties).startsWith("local")) {
+          throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
+                  " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
+                  " interpreter setting");
+        }
+        String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+        sparkRBasePath = new File(zeppelinHome,
+                "interpreter" + File.separator + "spark" + File.separator + "R");
+      } else {
+        sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
       }
-      String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
-      sparkRBasePath = new File(zeppelinHome,
-          "interpreter" + File.separator + "spark" + File.separator + "R");
-    } else {
-      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
-    }
 
-    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
-    if (sparkRPath.exists() && sparkRPath.isFile()) {
-      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
-          sparkRPath.getAbsolutePath() + "#sparkr");
-    } else {
-      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+      File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+      if (sparkRPath.exists() && sparkRPath.isFile()) {
+        mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
+                sparkRPath.getAbsolutePath() + "#sparkr");
+      } else {
+        LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+      }
     }
   }
 
diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index d7dcd0a..118e7d4 100644
--- a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -17,28 +17,37 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.util.Util;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class SparkInterpreterLauncherTest {
+
+  private String sparkHome;
+  private String zeppelinHome;
+
   @Before
   public void setUp() {
     for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
       System.clearProperty(confVar.getVarName());
     }
+
+    sparkHome = DownloadUtils.downloadSpark("2.3.2");
+    zeppelinHome = ZeppelinConfiguration.create().getZeppelinHome();
   }
 
   @Test
@@ -46,7 +55,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty(
         ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
     InterpreterOption option = new InterpreterOption();
@@ -56,8 +65,8 @@ public class SparkInterpreterLauncherTest {
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
-    assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
-    assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+    assertEquals(zeppelinHome + "/interpreter/groupName", interpreterProcess.getInterpreterDir());
+    assertEquals(zeppelinHome + "/local-repo/groupId", interpreterProcess.getLocalRepoDir());
     assertEquals(10000, interpreterProcess.getConnectTimeout());
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 2);
@@ -69,7 +78,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "local[*]");
     properties.setProperty("spark.files", "file_1");
@@ -85,7 +94,7 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
@@ -94,7 +103,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "yarn-client");
     properties.setProperty("spark.files", "file_1");
@@ -110,8 +119,15 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
+
+    String sparkJars = "'jar_1'";
+    String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
+    String sparkFiles = "'file_1'";
+    assertEquals(" --master yarn-client --conf spark.yarn.dist.archives=" + sparkrZip +
+                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
+                    " --conf spark.yarn.isPython=true",
+            interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
   @Test
@@ -119,7 +135,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "yarn");
     properties.setProperty("spark.submit.deployMode", "client");
@@ -136,8 +152,16 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 2);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
-    assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
+
+    String sparkJars = "'jar_1'";
+    String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
+    String sparkFiles = "'file_1'";
+    assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip +
+                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
+                    " --conf spark.submit.deployMode='client'" +
+                    " --conf spark.yarn.isPython=true",
+            interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
   @Test
@@ -145,7 +169,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "yarn-cluster");
     properties.setProperty("spark.files", "file_1");
@@ -161,9 +185,19 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 3);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
+
     assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn-cluster --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    String sparkJars = "'jar_1'," +
+            zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar";
+    String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
+    String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
+    assertEquals(" --master yarn-cluster --conf spark.yarn.dist.archives=" + sparkrZip +
+                    " --conf spark.yarn.maxAppAttempts=1" +
+                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
+                    " --conf spark.yarn.isPython=true" +
+                    " --conf spark.yarn.submit.waitAppCompletion=false",
+            interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
   }
 
   @Test
@@ -171,7 +205,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "yarn");
     properties.setProperty("spark.submit.deployMode", "cluster");
@@ -194,9 +228,19 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 3);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1 --jars " + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString(), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    String sparkJars = "'jar_1'," +
+            Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString() + "," +
+            zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar";
+    String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
+    String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
+    assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip +
+            " --conf spark.yarn.maxAppAttempts=1" +
+            " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
+            " --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true" +
+            " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1",
+            interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
     Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"));
     FileUtils.deleteDirectory(localRepoPath.toFile());
   }
@@ -206,7 +250,7 @@ public class SparkInterpreterLauncherTest {
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
     SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
-    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("SPARK_HOME", sparkHome);
     properties.setProperty("property_1", "value_1");
     properties.setProperty("master", "yarn");
     properties.setProperty("spark.submit.deployMode", "cluster");
@@ -228,9 +272,19 @@ public class SparkInterpreterLauncherTest {
     assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
     assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
     assertTrue(interpreterProcess.getEnv().size() >= 3);
-    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
-    assertEquals(" --master yarn --conf spark.files='file_1',.//conf/log4j_yarn_cluster.properties --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+
+    String sparkJars = "'jar_1'," +
+            zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar";
+    String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
+    String sparkFiles = "'file_1'," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
+    assertEquals(" --master yarn --conf spark.yarn.dist.archives=" + sparkrZip +
+                    " --conf spark.yarn.maxAppAttempts=1" +
+                    " --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
+                    " --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true" +
+                    " --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1",
+            interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
     FileUtils.deleteDirectory(localRepoPath.toFile());
   }
 }
diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
index 47756c9..df9aa31 100644
--- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -88,7 +88,7 @@ public class StandardInterpreterLauncher extends InterpreterLauncher {
     }
   }
 
-  protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
+  protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) throws IOException {
     Map<String, String> env = new HashMap<>();
     for (Object key : context.getProperties().keySet()) {
       if (RemoteInterpreterUtils.isEnvString((String) key)) {
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index a9fad2e..9177761 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -65,6 +65,14 @@
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>zeppelin-zengine</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Test libraries -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 35218a2..41d414d 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -75,6 +75,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
     </dependency>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
similarity index 98%
rename from zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java
rename to zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
index 4371b2b..546790d 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/DownloadUtils.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/integration/DownloadUtils.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.integration;
+package org.apache.zeppelin.interpreter.integration;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -65,7 +65,7 @@ public class DownloadUtils {
       LOGGER.info("Skip to download flink as it is already downloaded.");
       return targetFlinkHomeFolder.getAbsolutePath();
     }
-    download("flink", version, "-bin-hadoop27-scala_2.11.tgz");
+    download("flink", version, "-bin-hadoop2.6.tgz");
     return targetFlinkHomeFolder.getAbsolutePath();
   }