You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2019/11/26 16:06:25 UTC

[submarine] branch master updated: SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cd62e3  SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
6cd62e3 is described below

commit 6cd62e333432e2352e337db819e15e60f84145df
Author: xunix huang <qi...@qq.com>
AuthorDate: Tue Nov 26 22:39:48 2019 +0800

    SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
    
    ### What is this PR for?
    Add SparkSQL interpreter and improve the structure of interpreter
    
    ### What type of PR is it?
    Feature and Improvement
    
    ### Todos
    * [*] - add sparkSQL interpreter
    * [*] - add Abstract interpreter
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-277
    
    ### How should this be tested?
    https://travis-ci.org/hhhizzz/submarine/builds/617065629
    
    ### Questions:
    * Does the licenses files need an update? No
    * Are there breaking changes for older versions? Yes, some change in InterpreterProcess
    * Does this needs documentation? No
    
    Author: xunix huang <qi...@qq.com>
    Author: luzhonghao <lu...@163.com>
    
    Closes #107 from hhhizzz/luzhonghao and squashes the following commits:
    
    8dd305d [xunix huang] change exception
    4fc0b34 [xunix huang] change exception
    dfb2e3d [xunix huang] add license
    e31f788 [xunix huang] shade the zeppelin package
    409cdad [xunix huang] some fix
    8ec5df2 [xunix huang] SUBMARINE-277. Support Spark Interpreter add sparkSQL interpreter
    1654fbc [luzhonghao] sparkSqlInterpreter
---
 .gitignore                                         |   6 +
 submarine-commons/commons-cluster/pom.xml          |   2 +-
 submarine-dist/src/assembly/distribution.xml       |   7 -
 .../interpreter/interpreter-engine/pom.xml         |   1 -
 .../submarine/interpreter/AbstractInterpreter.java | 141 +++++++++++++
 .../apache/submarine/interpreter/Interpreter.java  |  19 +-
 .../interpreter/InterpreterException.java          |  42 ++++
 .../submarine/interpreter/InterpreterGroup.java    |  23 ++
 .../submarine/interpreter/InterpreterProcess.java  | 213 ++++++-------------
 .../interpreter/python-interpreter/pom.xml         |   2 +-
 .../submarine/interpreter/PythonInterpreter.java   | 107 +++-------
 .../interpreter/InterpreterClusterTest.java        |   3 +-
 .../interpreter/PythonInterpreterTest.java         |  37 ++--
 .../interpreter/spark-interpreter/README.md        |  30 ++-
 .../interpreter/spark-interpreter/pom.xml          |  56 ++---
 .../submarine/interpreter/SparkInterpreter.java    | 166 ++++-----------
 .../submarine/interpreter/SparkSqlInterpreter.java |  89 ++++++++
 .../interpreter/SparkInterpreterTest.java          | 233 ++++++++-------------
 .../interpreter/SparkSqlInterpreterTest.java       | 204 ++++++++++++++++++
 .../src/test/resources/hive-site.xml               |   7 +
 .../src/test/resources/log4j.properties            |  35 ++++
 21 files changed, 856 insertions(+), 567 deletions(-)

diff --git a/.gitignore b/.gitignore
index 521d78c..f614fea 100644
--- a/.gitignore
+++ b/.gitignore
@@ -73,3 +73,9 @@ submarine-workbench/workbench-web-ng/dist
 submarine-workbench/workbench-web-ng/package-lock.json
 submarine-workbench/workbench-web-ng/npm-debug.log*
 submarine-test/e2e/Driver
+
+# interpreter temp files
+derby.log
+submarine-workbench/interpreter/spark-interpreter/metastore_db/
+spark-1.*-bin-hadoop*
+.spark-dist
diff --git a/submarine-commons/commons-cluster/pom.xml b/submarine-commons/commons-cluster/pom.xml
index d0eb4c0..f1dbb22 100644
--- a/submarine-commons/commons-cluster/pom.xml
+++ b/submarine-commons/commons-cluster/pom.xml
@@ -158,7 +158,7 @@
               </filters>
               <transformers>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+                  <mainClass>org.apache.submarine.interpreter.AbstractInterpreter</mainClass>
                 </transformer>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
               </transformers>
diff --git a/submarine-dist/src/assembly/distribution.xml b/submarine-dist/src/assembly/distribution.xml
index ebd9794..c8cc582 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -149,13 +149,6 @@
       </includes>
     </fileSet>
     <fileSet>
-      <directory>../submarine-workbench/interpreter/spark/scala-2.11</directory>
-      <outputDirectory>/lib/interpreter/spark/scala-2.11</outputDirectory>
-      <includes>
-        <include>spark-scala-2.11-${zeppelin.version}.jar</include>
-      </includes>
-    </fileSet>
-    <fileSet>
       <directory>../submarine-workbench/interpreter/spark-interpreter/target</directory>
       <outputDirectory>/lib/interpreter/spark</outputDirectory>
       <includes>
diff --git a/submarine-workbench/interpreter/interpreter-engine/pom.xml b/submarine-workbench/interpreter/interpreter-engine/pom.xml
index 8d8e838..01d9129 100644
--- a/submarine-workbench/interpreter/interpreter-engine/pom.xml
+++ b/submarine-workbench/interpreter/interpreter-engine/pom.xml
@@ -105,7 +105,6 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-
   <build>
     <finalName>submarine-${artifactId}-${project.version}</finalName>
     <plugins>
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java
new file mode 100644
index 0000000..d319838
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/AbstractInterpreter.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Entry point for Submarine Interpreter process.
+ * Accepting thrift connections from Submarine Workbench Server.
+ */
+public abstract class AbstractInterpreter implements Interpreter {
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractInterpreter.class);
+
+  protected org.apache.zeppelin.interpreter.Interpreter zeppelinInterpreter;
+
+  private InterpreterContext interpreterContext;
+
+  protected InterpreterContext getIntpContext() {
+    if (this.interpreterContext == null) {
+      this.interpreterContext = InterpreterContext.builder()
+              .setInterpreterOut(new InterpreterOutput(null))
+              .build();
+      InterpreterContext.set(this.interpreterContext);
+    }
+    return this.interpreterContext;
+  }
+
+  public void setInterpreterGroup(InterpreterGroup interpreterGroup) {
+    this.zeppelinInterpreter.setInterpreterGroup(interpreterGroup);
+  }
+
+  public InterpreterGroup getInterpreterGroup() {
+    return this.zeppelinInterpreter.getInterpreterGroup();
+  }
+
+  protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+    Properties newProps = new Properties();
+
+    for (String key : properties.stringPropertyNames()) {
+      String newKey = key.replace("submarine", "zeppelin");
+      newProps.put(newKey, properties.getProperty(key));
+    }
+
+    return newProps;
+  }
+
+  @Override
+  public InterpreterResult interpret(String code) throws InterpreterException {
+    InterpreterResult interpreterResult = null;
+    try {
+      org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
+              = this.zeppelinInterpreter.interpret(code, getIntpContext());
+      interpreterResult = new InterpreterResult(zeplInterpreterResult);
+
+      List<InterpreterResultMessage> interpreterResultMessages =
+              getIntpContext().out.toInterpreterResultMessage();
+
+      for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
+        interpreterResult.add(message);
+      }
+    } catch (org.apache.zeppelin.interpreter.InterpreterException | IOException e) {
+      LOG.error(e.getMessage(), e);
+      throw new InterpreterException(e);
+    }
+
+    return interpreterResult;
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+    getIntpContext();
+    try {
+      this.zeppelinInterpreter.open();
+    } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      throw new org.apache.submarine.interpreter.InterpreterException(e);
+    }
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+    try {
+      this.zeppelinInterpreter.close();
+    } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      throw new org.apache.submarine.interpreter.InterpreterException(e);
+    }
+  }
+
+  @Override
+  public void cancel() throws InterpreterException {
+    try {
+      this.zeppelinInterpreter.cancel(getIntpContext());
+    } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      throw new org.apache.submarine.interpreter.InterpreterException(e);
+    }
+  }
+
+  @Override
+  public int getProgress() throws InterpreterException {
+    int process = 0;
+    try {
+      process = this.zeppelinInterpreter.getProgress(getIntpContext());
+    } catch (org.apache.zeppelin.interpreter.InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      throw new org.apache.submarine.interpreter.InterpreterException(e);
+    }
+    return process;
+  }
+
+  @Override
+  public void addToSession(String session) {
+    this.zeppelinInterpreter.getInterpreterGroup().get(session).add(this.zeppelinInterpreter);
+  }
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
index 9681ef4..ae7cdfb 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/Interpreter.java
@@ -18,12 +18,17 @@
 package org.apache.submarine.interpreter;
 
 public interface Interpreter {
-  void shutdown();
-  boolean isRunning();
-  void open();
-  InterpreterResult interpret(String code);
-  void close();
-  void cancel();
-  int getProgress();
+  void open() throws InterpreterException;
+
+  InterpreterResult interpret(String code) throws InterpreterException;
+
+  void close() throws InterpreterException;
+
+  void cancel() throws InterpreterException;
+
+  int getProgress() throws InterpreterException;
+
   boolean test();
+
+  void addToSession(String session);
 }
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java
new file mode 100644
index 0000000..636d1fe
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+public class InterpreterException extends org.apache.zeppelin.interpreter.InterpreterException {
+
+  public InterpreterException() {
+  }
+
+  public InterpreterException(Throwable e) {
+    super(e);
+  }
+
+  public InterpreterException(String m) {
+    super(m);
+  }
+
+  public InterpreterException(String msg, Throwable t) {
+    super(msg, t);
+  }
+
+  public InterpreterException(String message, Throwable cause, boolean enableSuppression,
+                              boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java
new file mode 100644
index 0000000..bfb0d5f
--- /dev/null
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterGroup.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+public class InterpreterGroup extends org.apache.zeppelin.interpreter.InterpreterGroup {
+
+}
diff --git a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
index 0e40020..1e9a50c 100644
--- a/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
+++ b/submarine-workbench/interpreter/interpreter-engine/src/main/java/org/apache/submarine/interpreter/InterpreterProcess.java
@@ -22,101 +22,55 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.submarine.commons.cluster.ClusterClient;
 import org.apache.submarine.commons.cluster.meta.ClusterMeta;
 import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.misc.Signal;
-import sun.misc.SignalHandler;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.SocketException;
-import java.net.URL;
 import java.net.URLClassLoader;
 import java.net.UnknownHostException;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
-/**
- * Entry point for Submarine Interpreter process.
- * Accepting thrift connections from Submarine Workbench Server.
- */
-public class InterpreterProcess extends Thread implements Interpreter {
-  private static final Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
+public class InterpreterProcess extends Thread {
 
-  // cluster manager client
-  private ClusterClient clusterClient = ClusterClient.getInstance();
+  private static Logger LOG = LoggerFactory.getLogger(InterpreterProcess.class);
 
-  private SubmarineConfiguration sconf = SubmarineConfiguration.getInstance();
+  private Interpreter interpreter;
 
-  protected String interpreterId;
-
-  private InterpreterProcess interpreterProcess;
+  private static InterpreterProcess process;
 
   private AtomicBoolean isRunning = new AtomicBoolean(false);
 
-  public static void main(String[] args) throws InterruptedException, IOException {
-    String interpreterType = args[0];
-    String interpreterId = args[1];
-    Boolean onlyTest = false;
-    if (args.length == 3 && StringUtils.equals(args[2], "test")) {
-      onlyTest = true;
-    }
-
-    InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
-    interpreterProcess.start();
-
-    // add signal handler
-    Signal.handle(new Signal("TERM"), new SignalHandler() {
-      @Override
-      public void handle(Signal signal) {
-        // clean
-        LOG.info("handle signal:{}", signal);
-      }
-    });
-
-    interpreterProcess.join();
-    System.exit(0);
-  }
+  // cluster manager client
+  private ClusterClient clusterClient = ClusterClient.getInstance();
 
-  @Override
-  public void run() {
-    isRunning.set(true);
-    while (isRunning.get()) {
-      try {
-        // TODO(Xun Liu): Next PR will add Thrift Server in here
-        LOG.info("Mock TServer run ...");
-        sleep(1000);
-      } catch (InterruptedException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
-  }
+  private SubmarineConfiguration sconf = SubmarineConfiguration.getInstance();
 
-  public boolean isRunning() {
-    return isRunning.get();
-  }
+  protected String interpreterId;
 
   public InterpreterProcess() { }
 
   public InterpreterProcess(String interpreterType, String interpreterId, Boolean onlyTest)
-      throws IOException {
+          throws IOException {
     this.interpreterId = interpreterId;
-    this.interpreterProcess = loadInterpreterPlugin(interpreterType);
+    try {
+      loadInterpreterPlugin(interpreterType);
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      System.exit(e.hashCode());
+    }
 
-    if (true == onlyTest) {
-      boolean testResult = interpreterProcess.test();
+    if (onlyTest) {
+      boolean testResult = interpreter.test();
       LOG.info("Interpreter test result: {}", testResult);
       System.exit(0);
       return;
@@ -151,38 +105,38 @@ public class InterpreterProcess extends Thread implements Interpreter {
   }
 
   // get super interpreter class name
-  private String getSuperInterpreterClassName(String intpName) {
+  private String getSuperInterpreterClassName(String intpName) throws InterpreterException {
     String superIntpClassName = "";
     if (StringUtils.equals(intpName, "python")) {
       superIntpClassName = "org.apache.submarine.interpreter.PythonInterpreter";
     } else if (StringUtils.equals(intpName, "spark")) {
       superIntpClassName = "org.apache.submarine.interpreter.SparkInterpreter";
+    } else if (StringUtils.equals(intpName, "sparksql")) {
+      superIntpClassName = "org.apache.submarine.interpreter.SparkSqlInterpreter";
     } else {
-      superIntpClassName = "org.apache.submarine.interpreter.InterpreterProcess";
+      throw new InterpreterException("cannot recognize the interpreter: " + intpName);
     }
 
     return superIntpClassName;
   }
 
-  public synchronized InterpreterProcess loadInterpreterPlugin(String pluginName)
-      throws IOException {
+  public synchronized void loadInterpreterPlugin(String pluginName)
+          throws IOException, InterpreterException {
 
     LOG.info("Loading Plug name: {}", pluginName);
     String pluginClassName = getSuperInterpreterClassName(pluginName);
     LOG.info("Loading Plug Class name: {}", pluginClassName);
 
     // load plugin from classpath directly first for these builtin Interpreter Plugin.
-    InterpreterProcess intpProcess = loadPluginFromClassPath(pluginClassName, null);
-    if (intpProcess == null) {
+    this.interpreter = loadPluginFromClassPath(pluginClassName, null);
+    if (this.interpreter == null) {
       throw new IOException("Fail to load plugin: " + pluginName);
     }
-
-    return intpProcess;
   }
 
-  private InterpreterProcess loadPluginFromClassPath(
-      String pluginClassName, URLClassLoader pluginClassLoader) {
-    InterpreterProcess intpProcess = null;
+  private Interpreter loadPluginFromClassPath(
+          String pluginClassName, URLClassLoader pluginClassLoader) {
+    Interpreter intpProcess = null;
     try {
       Class<?> clazz = null;
       if (null == pluginClassLoader) {
@@ -202,107 +156,58 @@ public class InterpreterProcess extends Thread implements Interpreter {
         LOG.debug(method.getName());
       }
 
-      intpProcess = (InterpreterProcess) (clazz.getConstructor().newInstance());
+      intpProcess = (Interpreter) (clazz.getConstructor().newInstance());
       return intpProcess;
     } catch (InstantiationException | IllegalAccessException | ClassNotFoundException
-        | NoSuchMethodException | InvocationTargetException e) {
+            | NoSuchMethodException | InvocationTargetException e) {
       LOG.warn("Fail to instantiate InterpreterLauncher from classpath directly:", e);
     }
 
     return intpProcess;
   }
 
-  // Get the class load from the specified path
-  private URLClassLoader getPluginClassLoader(String pluginsDir, String pluginName)
-      throws IOException {
-    File pluginFolder = new File(pluginsDir + "/" + pluginName);
-    if (!pluginFolder.exists() || pluginFolder.isFile()) {
-      LOG.warn("PluginFolder {} doesn't exist or is not a directory", pluginFolder.getAbsolutePath());
-      return null;
-    }
-    List<URL> urls = new ArrayList<>();
-    for (File file : pluginFolder.listFiles()) {
-      LOG.debug("Add file {} to classpath of plugin: {}", file.getAbsolutePath(), pluginName);
-      urls.add(file.toURI().toURL());
-    }
-    if (urls.isEmpty()) {
-      LOG.warn("Can not load plugin {}, because the plugin folder {} is empty.", pluginName, pluginFolder);
-      return null;
-    }
-    return new URLClassLoader(urls.toArray(new URL[0]));
-  }
-
-  protected Properties mergeZeplPyIntpProp(Properties newProps) {
-    Properties properties = new Properties();
-    // Max number of dataframe rows to display.
-    properties.setProperty("zeppelin.python.maxResult", "1000");
-    // whether use IPython when it is available
-    properties.setProperty("zeppelin.python.useIPython", "false");
-    properties.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
-
-    if (null != newProps) {
-      newProps.putAll(properties);
-      return newProps;
-    } else {
-      return properties;
-    }
-  }
-
-  protected Properties mergeZeplSparkIntpProp(Properties newProps) {
-    Properties properties = new Properties();
-
-    properties.setProperty("zeppelin.spark.maxResult", "1000");
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-
-    if (null != newProps) {
-      newProps.putAll(properties);
-      return newProps;
-    } else {
-      return properties;
-    }
+  public boolean isRunning() {
+    return isRunning.get();
   }
 
-  protected static InterpreterContext getIntpContext() {
-    return InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput(null))
-        .build();
-  }
 
-  @Override
   public void shutdown() {
     isRunning.set(false);
   }
 
-  @Override
-  public void open() {
-    LOG.error("Please implement the open() method of the child class!");
-  }
 
-  @Override
-  public InterpreterResult interpret(String code) {
-    LOG.error("Please implement the interpret() method of the child class!");
-    return null;
-  }
+  public static void main(String[] args) throws InterruptedException, IOException {
+    String interpreterType = args[0];
+    String interpreterId = args[1];
+    boolean onlyTest = false;
+    if (args.length == 3 && StringUtils.equals(args[2], "test")) {
+      onlyTest = true;
+    }
 
-  @Override
-  public void close() {
-    LOG.error("Please implement the close() method of the child class!");
-  }
+    InterpreterProcess interpreterProcess = new InterpreterProcess(interpreterType, interpreterId, onlyTest);
+    interpreterProcess.start();
 
-  @Override
-  public void cancel() {
-    LOG.error("Please implement the cancel() method of the child class!");
-  }
+    // add signal handler
+    Signal.handle(new Signal("TERM"), signal -> {
+      // clean
+      LOG.info("handle signal:{}", signal);
+    });
 
-  @Override
-  public int getProgress() {
-    LOG.error("Please implement the getProgress() method of the child class!");
-    return 0;
+    interpreterProcess.join();
+    System.exit(0);
   }
 
   @Override
-  public boolean test() {
-    LOG.error("Please implement the test() method of the child class!");
-    return false;
+  public void run() {
+    isRunning.set(true);
+    while (isRunning.get()) {
+      try {
+        // TODO(Xun Liu): Next PR will add Thrift Server in here
+        LOG.info("Mock TServer run ...");
+        sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
   }
 }
diff --git a/submarine-workbench/interpreter/python-interpreter/pom.xml b/submarine-workbench/interpreter/python-interpreter/pom.xml
index 7a18634..d589a94 100644
--- a/submarine-workbench/interpreter/python-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/python-interpreter/pom.xml
@@ -228,7 +228,7 @@
               </filters>
               <transformers>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                  <mainClass>org.apache.submarine.interpreter.InterpreterProcess</mainClass>
+                  <mainClass>org.apache.submarine.interpreter.AbstractInterpreter</mainClass>
                 </transformer>
                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
               </transformers>
diff --git a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
index e413684..699a85e 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/main/java/org/apache/submarine/interpreter/PythonInterpreter.java
@@ -19,104 +19,61 @@
 package org.apache.submarine.interpreter;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.List;
 import java.util.Properties;
 
-public class PythonInterpreter extends InterpreterProcess {
+public class PythonInterpreter extends AbstractInterpreter {
   private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
 
-  private org.apache.zeppelin.python.PythonInterpreter zplePythonInterpreter;
-  private InterpreterContext intpContext;
-
   public PythonInterpreter() {
-    Properties properties = new Properties();
-    properties = mergeZeplPyIntpProp(properties);
-    zplePythonInterpreter = new org.apache.zeppelin.python.PythonInterpreter(properties);
-    zplePythonInterpreter.setInterpreterGroup(new InterpreterGroup());
-    intpContext = this.getIntpContext();
-  }
-
-  @Override
-  public void open() {
-    try {
-      zplePythonInterpreter.open();
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
-    }
+    this(new Properties());
   }
 
-  @Override
-  public InterpreterResult interpret(String code) {
-    InterpreterResult interpreterResult = null;
-    try {
-      org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
-          = zplePythonInterpreter.interpret(code, intpContext);
-      interpreterResult = new InterpreterResult(zeplInterpreterResult);
-
-      List<org.apache.zeppelin.interpreter.InterpreterResultMessage> interpreterResultMessages =
-          intpContext.out.toInterpreterResultMessage();
-
-      for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
-        interpreterResult.add(message);
-      }
-    } catch (InterpreterException | IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    return interpreterResult;
-  }
-
-  @Override
-  public void close() {
-    try {
-      zplePythonInterpreter.close();
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
-    }
+  public PythonInterpreter(Properties properties){
+    properties = mergeZeppelinInterpreterProperties(properties);
+    this.zeppelinInterpreter = new org.apache.zeppelin.python.PythonInterpreter(properties);
+    this.zeppelinInterpreter.setInterpreterGroup(new InterpreterGroup());
   }
 
   @Override
-  public void cancel() {
-    try {
-      zplePythonInterpreter.cancel(intpContext);
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
+  protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+    properties = super.mergeZeppelinInterpreterProperties(properties);
+    Properties newProps = new Properties();
+    // Max number of dataframe rows to display.
+    newProps.setProperty("zeppelin.python.maxResult", "1000");
+    // whether use IPython when it is available
+    newProps.setProperty("zeppelin.python.useIPython", "false");
+    newProps.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
+
+    if (null != properties) {
+      newProps.putAll(properties);
     }
+    return newProps;
   }
 
   @Override
-  public int getProgress() {
-    int process = 0;
+  public boolean test() {
     try {
-      process = zplePythonInterpreter.getProgress(intpContext);
+      open();
+      String code = "1 + 1";
+      InterpreterResult result = interpret(code);
+      LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
+               code, result.message().get(0).getData()
+      );
+      if (result.code() != InterpreterResult.Code.SUCCESS) {
+        return false;
+      }
+      if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
+        return true;
+      }
+      return false;
     } catch (InterpreterException e) {
       LOG.error(e.getMessage(), e);
-    }
-
-    return process;
-  }
-
-  @Override
-  public boolean test() {
-    open();
-    String code = "1 + 1";
-    InterpreterResult result = interpret(code);
-    LOG.info("Execution Python Interpreter, Calculation formula {}, Result = {}",
-        code, result.message().get(0).getData());
-
-    if (result.code() != InterpreterResult.Code.SUCCESS) {
       return false;
     }
-    if (StringUtils.equals(result.message().get(0).getData(), "2\n")) {
-      return true;
-    }
-    return false;
   }
 }
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
index 2a2da4a..8fc91f7 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/InterpreterClusterTest.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
 import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 import static org.apache.submarine.commons.cluster.meta.ClusterMetaType.SERVER_META;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class InterpreterClusterTest {
   private static Logger LOG = LoggerFactory.getLogger(InterpreterClusterTest.class);
@@ -154,6 +155,6 @@ public class InterpreterClusterTest {
       }
       Thread.sleep(200);
     }
-    assertEquals(false, interpreterProcess.isRunning());
+    assertFalse(interpreterProcess.isRunning());
   }
 }
diff --git a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
index d897da0..8096ba7 100644
--- a/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
+++ b/submarine-workbench/interpreter/python-interpreter/src/test/java/org/apache/submarine/interpreter/PythonInterpreterTest.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class PythonInterpreterTest {
   private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterTest.class);
@@ -40,7 +41,7 @@ public class PythonInterpreterTest {
   private static PythonInterpreter pythonInterpreterForClose = null;
 
   @BeforeClass
-  public static void setUp() {
+  public static void setUp() throws InterpreterException {
     pythonInterpreterForCancel = new PythonInterpreter();
     pythonInterpreterForClose = new PythonInterpreter();
     pythonInterpreterForCancel.open();
@@ -48,7 +49,7 @@ public class PythonInterpreterTest {
   }
 
   @AfterClass
-  public static void tearDown()  {
+  public static void tearDown() throws InterpreterException {
     if (null != pythonInterpreterForCancel) {
       pythonInterpreterForCancel.close();
     }
@@ -59,7 +60,7 @@ public class PythonInterpreterTest {
 
 
   @Test
-  public void calcOnePlusOne() {
+  public void calcOnePlusOne() throws InterpreterException {
     String code = "1+1";
     InterpreterResult result = pythonInterpreterForCancel.interpret(code);
     LOG.info("result = {}", result);
@@ -69,30 +70,40 @@ public class PythonInterpreterTest {
     assertEquals(result.message().get(0).getData(), "2\n");
   }
 
-  private class infinityPythonJobforCancel implements Runnable {
+  private static class infinityPythonJobforCancel implements Runnable {
     @Override
     public void run() {
       String code = "import time\nwhile True:\n  time.sleep(1)";
       InterpreterResult ret = null;
-      ret = pythonInterpreterForCancel.interpret(code);
-      assertNotNull(ret);
-      Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
-      Matcher m = expectedMessage.matcher(ret.message().toString());
-      assertTrue(m.find());
+      try {
+        ret = pythonInterpreterForCancel.interpret(code);
+        assertNotNull(ret);
+        Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
+        Matcher m = expectedMessage.matcher(ret.message().toString());
+        assertTrue(m.find());
+      } catch (InterpreterException e) {
+        e.printStackTrace();
+        fail();
+      }
     }
   }
 
-  private class infinityPythonJobforClose implements Runnable {
+  private static class infinityPythonJobforClose implements Runnable {
     @Override
     public void run() {
       String code = "import time\nwhile True:\n  time.sleep(1)";
-      pythonInterpreterForClose.interpret(code);
+      try {
+        pythonInterpreterForClose.interpret(code);
+      } catch (org.apache.submarine.interpreter.InterpreterException e) {
+        e.printStackTrace();
+        fail();
+      }
     }
   }
 
 
   @Test
-  public void testCloseIntp() throws InterruptedException {
+  public void testCloseIntp() throws InterruptedException, InterpreterException {
     assertEquals(InterpreterResult.Code.SUCCESS,
             pythonInterpreterForClose.interpret("1+1\n").code());
     Thread t = new Thread(new infinityPythonJobforClose());
@@ -106,7 +117,7 @@ public class PythonInterpreterTest {
 
 
   @Test
-  public void testCancelIntp() throws InterruptedException {
+  public void testCancelIntp() throws InterruptedException, InterpreterException {
     assertEquals(InterpreterResult.Code.SUCCESS,
             pythonInterpreterForCancel.interpret("1+1\n").code());
     Thread t = new Thread(new infinityPythonJobforCancel());
diff --git a/submarine-workbench/interpreter/spark-interpreter/README.md b/submarine-workbench/interpreter/spark-interpreter/README.md
index 9b210ce..3135511 100644
--- a/submarine-workbench/interpreter/spark-interpreter/README.md
+++ b/submarine-workbench/interpreter/spark-interpreter/README.md
@@ -17,13 +17,16 @@
 ## Test Submarine Spark Interpreter
 
 ### Execute test command
-```
-export SUBMARINE_HOME=/path/to/your/submarine_home
-java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id test
+```bash
+# spark interpreter
+java -jar submarine-spark-interpreter-{{version}}-shade.jar spark spark-interpreter-id test
+# sparkSQL interpreter
+java -jar submarine-spark-interpreter-{{version}}-shade.jar sparksql sparkSQL-interpreter-id test
 ```
 
 ### Print test result 
-```
+#### Spark
+```log
  INFO [2019-11-09 11:12:04,888] ({main} ContextHandler.java[doStart]:781) - Started o.s.j.s.ServletContextHandler@58b97c15{/stages/stage/kill,null,AVAILABLE,@Spark}
  INFO [2019-11-09 11:12:04,889] ({main} Logging.scala[logInfo]:54) - Bound SparkUI to 0.0.0.0, and started at http://10.0.0.3:4040
  INFO [2019-11-09 11:12:04,923] ({main} Logging.scala[logInfo]:54) - Starting executor ID driver on host localhost
@@ -63,13 +66,30 @@ java -jar spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
  INFO [2019-11-09 11:12:08,553] ({shutdown-hook-0} Logging.scala[logInfo]:54) - Deleting directory /private/var/folders/xl/_xb3fgzj5zd698khfz6z74cc0000gn/T/spark-2f4acad9-a72d-4bca-8d85-3ef310f0b08c
 ```
 
+#### SparkSQL
+```log
+ INFO [2019-11-25 19:13:05,993] ({task-result-getter-0} Logging.scala[logInfo]:54) - Finished task 0.0 in stage 0.0 (TID 0) in 79 ms on localhost (executor driver) (1/1)
+ INFO [2019-11-25 19:13:05,994] ({task-result-getter-0} Logging.scala[logInfo]:54) - Removed TaskSet 0.0, whose tasks have all completed, from pool 
+ INFO [2019-11-25 19:13:05,997] ({dag-scheduler-event-loop} Logging.scala[logInfo]:54) - ResultStage 0 (takeAsList at Spark2Shims.java:65) finished in 0.093 s
+DEBUG [2019-11-25 19:13:05,999] ({dag-scheduler-event-loop} Logging.scala[logDebug]:58) - After removal of stage 0, remaining stages = 0
+ INFO [2019-11-25 19:13:06,000] ({main} Logging.scala[logInfo]:54) - Job 0 finished: takeAsList at Spark2Shims.java:65, took 0.744646 s
+ INFO [2019-11-25 19:13:06,006] ({main} SparkSqlInterpreter.java[test]:71) - Execution SparkSQL Interpreter, Calculation Spark Code  SUCCESS, Result = 
+group    person
+g1      [moon,33]
+g2      [sun,11]
+
+ INFO [2019-11-25 19:13:06,006] ({main} InterpreterProcess.java[<init>]:51) - Interpreter test result: true
+ INFO [2019-11-25 19:13:06,009] ({shutdown-hook-0} Logging.scala[logInfo]:54) - Invoking stop() from shutdown hook
+
+```
+
 
 ## Debug Submarine Spark Interpreter
 
 ### Execute debug command
 
 ```
-java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 spark-interpreter-0.3.0-SNAPSHOT-shade.jar spark spark-interpreter-id
+java -jar -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 spark-interpreter-{{version}}-shade.jar spark spark-interpreter-id
 ```
 
 Connect via remote debugging in IDEA
diff --git a/submarine-workbench/interpreter/spark-interpreter/pom.xml b/submarine-workbench/interpreter/spark-interpreter/pom.xml
index d9a9b05..4388173 100644
--- a/submarine-workbench/interpreter/spark-interpreter/pom.xml
+++ b/submarine-workbench/interpreter/spark-interpreter/pom.xml
@@ -142,6 +142,21 @@
       <artifactId>slf4j-api</artifactId>
       <version>${slf4j.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.10</artifactId>
+      <version>${zeppelin.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.12</artifactId>
+      <version>${zeppelin.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-scala-2.11</artifactId>
+      <version>${zeppelin.version}</version>
+    </dependency>
 
     <!-- Test libraries -->
     <dependency>
@@ -162,17 +177,18 @@
       <version>${zeppelin.version}</version>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.zeppelin</groupId>
-      <artifactId>spark-scala-2.11</artifactId>
-      <version>0.9.0-SNAPSHOT</version>
-      <scope>provided</scope>
-    </dependency>
-
   </dependencies>
 
   <build>
     <finalName>submarine-${artifactId}-${project.version}</finalName>
+    <testResources>
+      <testResource>
+        <directory>${project.basedir}/src/test/resources</directory>
+      </testResource>
+      <testResource>
+        <directory>${project.basedir}/src/main/resources</directory>
+      </testResource>
+    </testResources>
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -233,32 +249,6 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-dependency-plugin</artifactId>
-
-        <executions>
-          <execution>
-            <id>copy-dependencies-runtime</id>
-            <phase>validate</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <includeArtifactIds>spark-scala-2.11</includeArtifactIds>
-              <includeGroupIds>org.apache.zeppelin</includeGroupIds>
-              <outputDirectory>../spark/scala-2.11</outputDirectory>
-            </configuration>
-          </execution>
-          <execution>
-            <id>copy-dependencies-system</id>
-            <phase>package</phase>
-            <goals>
-              <goal>copy-dependencies</goal>
-            </goals>
-            <configuration>
-              <includeScope>system</includeScope>
-              <excludeTransitive>true</excludeTransitive>
-            </configuration>
-          </execution>
-        </executions>
       </plugin>
 
     </plugins>
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
index 40ee160..132ef60 100644
--- a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
+++ b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkInterpreter.java
@@ -18,158 +18,76 @@
  */
 package org.apache.submarine.interpreter;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.spark.SparkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
-public class SparkInterpreter extends InterpreterProcess {
+public class SparkInterpreter extends AbstractInterpreter {
   private static final Logger LOG = LoggerFactory.getLogger(SparkInterpreter.class);
 
-  private org.apache.zeppelin.spark.SparkInterpreter zpleSparkInterpreter;
-  private InterpreterContext intpContext;
-
-  private String extractScalaVersion() throws InterpreterException {
-    String scalaVersionString = scala.util.Properties.versionString();
-    LOG.info("Using Scala: " + scalaVersionString);
-    if (scalaVersionString.contains("version 2.10")) {
-      return "2.10";
-    } else if (scalaVersionString.contains("version 2.11")) {
-      return "2.11";
-    } else if (scalaVersionString.contains("version 2.12")) {
-      return "2.12";
-    } else {
-      throw new InterpreterException("Unsupported scala version: " + scalaVersionString);
-    }
-  }
-
   public SparkInterpreter(Properties properties) {
-    properties = mergeZeplSparkIntpProp(properties);
-    zpleSparkInterpreter = new org.apache.zeppelin.spark.SparkInterpreter(properties);
-    zpleSparkInterpreter.setInterpreterGroup(new InterpreterGroup());
-    intpContext = this.getIntpContext();
+    properties = mergeZeppelinInterpreterProperties(properties);
+    this.zeppelinInterpreter = new org.apache.zeppelin.spark.SparkInterpreter(properties);
+    this.setInterpreterGroup(new InterpreterGroup());
   }
+
   public SparkInterpreter() {
     this(new Properties());
   }
 
   @Override
-  public void open() {
+  public boolean test() {
     try {
-      ClassLoader scalaInterpreterClassLoader = null;
-      String submarineHome = System.getenv("SUBMARINE_HOME");
-      String interpreterDir  = "";
-      if (StringUtils.isBlank(submarineHome)) {
-        LOG.warn("SUBMARINE_HOME is not set, default interpreter directory is ../ ");
-        interpreterDir = "..";
-      } else {
-        interpreterDir = submarineHome + "/workbench/interpreter";
+      open();
+      String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+              "df.show()";
+      InterpreterResult result = interpret(code);
+      LOG.info("Execution Spark Interpreter, Calculation Spark Code  {}, Result = {}",
+               code, result.message().get(0).getData()
+      );
+
+      if (result.code() != InterpreterResult.Code.SUCCESS) {
+        close();
+        return false;
       }
-      String scalaVersion = extractScalaVersion();
-      File scalaJarFolder = new File(interpreterDir + "/spark/scala-" + scalaVersion);
-      List<URL> urls = new ArrayList<>();
-      for (File file : scalaJarFolder.listFiles()) {
-        LOG.info("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: "
-                + scalaJarFolder);
-        urls.add(file.toURI().toURL());
-      }
-      scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]),
-              Thread.currentThread().getContextClassLoader());
-      if (scalaInterpreterClassLoader != null) {
-        Thread.currentThread().setContextClassLoader(scalaInterpreterClassLoader);
-      }
-      zpleSparkInterpreter.open();
+      boolean success = (result.message().get(0).getData().contains(
+              "+---+----+\n" +
+              "| _1|  _2|\n" +
+              "+---+----+\n" +
+              "|  1|   a|\n" +
+              "|  2|null|\n" +
+              "+---+----+")
+      );
+      close();
+      return success;
     } catch (InterpreterException e) {
       LOG.error(e.getMessage(), e);
-    } catch (MalformedURLException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public InterpreterResult interpret(String code) {
-    InterpreterResult interpreterResult = null;
-    try {
-      org.apache.zeppelin.interpreter.InterpreterResult zeplInterpreterResult
-              = zpleSparkInterpreter.interpret(code, intpContext);
-      interpreterResult = new InterpreterResult(zeplInterpreterResult);
-
-      List<InterpreterResultMessage> interpreterResultMessages =
-              intpContext.out.toInterpreterResultMessage();
-
-      for (org.apache.zeppelin.interpreter.InterpreterResultMessage message : interpreterResultMessages) {
-        interpreterResult.add(message);
-      }
-    } catch (InterpreterException | IOException e) {
-      LOG.error(e.getMessage(), e);
+      return false;
     }
-
-    return interpreterResult;
   }
 
-  @Override
-  public void close() {
-    try {
-      zpleSparkInterpreter.close();
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
-    }
+  public SparkContext getSparkContext() {
+    return ((org.apache.zeppelin.spark.SparkInterpreter) this.zeppelinInterpreter).getSparkContext();
   }
 
-  @Override
-  public void cancel() {
-    try {
-      zpleSparkInterpreter.cancel(intpContext);
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
-    }
+  public void setSchedulerPool(String pool){
+    this.getIntpContext().getLocalProperties().put("pool", pool);
   }
 
   @Override
-  public int getProgress() {
-    int process = 0;
-    try {
-      process = zpleSparkInterpreter.getProgress(intpContext);
-    } catch (InterpreterException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    return process;
-  }
+  protected Properties mergeZeppelinInterpreterProperties(Properties properties) {
+    properties = super.mergeZeppelinInterpreterProperties(properties);
+    Properties defaultProperties = new Properties();
 
-  @Override
-  public boolean test() {
-    open();
-    String code = "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
-        "df.show()";
-    InterpreterResult result = interpret(code);
-    LOG.info("Execution Spark Interpreter, Calculation Spark Code  {}, Result = {}",
-             code, result.message().get(0).getData());
+    defaultProperties.setProperty("zeppelin.spark.maxResult", "1000");
+    defaultProperties.setProperty("zeppelin.spark.scala.color", "false");
 
-    if (result.code() != InterpreterResult.Code.SUCCESS) {
-      close();
-      return false;
+    if (null != properties) {
+      defaultProperties.putAll(properties);
     }
-    boolean success = (result.message().get(0).getData().contains(
-            "+---+----+\n" +
-                "| _1|  _2|\n" +
-                "+---+----+\n" +
-                "|  1|   a|\n" +
-                "|  2|null|\n" +
-                "+---+----+"));
-    close();
-    return success;
+    return defaultProperties;
   }
+
 }
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java
new file mode 100644
index 0000000..a9732cb
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/main/java/org/apache/submarine/interpreter/SparkSqlInterpreter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+public class SparkSqlInterpreter extends AbstractInterpreter {
+  private static final Logger LOG = LoggerFactory.getLogger(SparkInterpreter.class);
+
+  public SparkSqlInterpreter(Properties properties) {
+    properties = mergeZeppelinInterpreterProperties(properties);
+    this.zeppelinInterpreter = new org.apache.zeppelin.spark.SparkSqlInterpreter(properties);
+  }
+
+  public SparkSqlInterpreter() {
+    this(new Properties());
+  }
+
+
+  @Override
+  public boolean test() {
+    try {
+      InterpreterGroup intpGroup = new InterpreterGroup();
+      SparkInterpreter sparkInterpreter = new SparkInterpreter();
+      sparkInterpreter.setInterpreterGroup(intpGroup);
+
+      this.setInterpreterGroup(intpGroup);
+
+      String session = "session_1";
+      intpGroup.put(session, new LinkedList<>());
+      sparkInterpreter.addToSession(session);
+      this.addToSession(session);
+
+      sparkInterpreter.open();
+      open();
+
+      sparkInterpreter.interpret("case class Person(name:String, age:Int)");
+      sparkInterpreter.interpret("case class People(group:String, person:Person)");
+      sparkInterpreter.interpret(
+              "val gr = sc.parallelize(Seq(" +
+                      "People(\"g1\", " +
+                      "Person(\"moon\",33)), " +
+                      "People(\"g2\", " +
+                      "Person(\"sun\",11" +
+                      "))))");
+      sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+      InterpreterResult result = interpret("select * from gr");
+      LOG.info("Execution SparkSQL Interpreter, Calculation Spark Code {}, Result =\n {}",
+               result.code(), result.message().get(0).getData()
+      );
+      if (result.code() != InterpreterResult.Code.SUCCESS) {
+        return false;
+      }
+      if (!result.message().get(0).getData().contains("[moon,33]") ||
+              !result.message().get(0).getData().contains("[sun,11]")) {
+        return false;
+      }
+      return true;
+    } catch (InterpreterException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    }
+  }
+
+  public void setResultLimits(long number){
+    getIntpContext().getLocalProperties().put("limit", String.valueOf(number));
+  }
+}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
index 9672e21..3a2f8b6 100644
--- a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkInterpreterTest.java
@@ -17,65 +17,42 @@
 
 package org.apache.submarine.interpreter;
 
-import org.apache.zeppelin.display.AngularObjectRegistry;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.fail;
 
 public class SparkInterpreterTest {
   private SparkInterpreter interpreter;
 
   // catch the streaming output in onAppend
   private volatile String output = "";
-  // catch the interpreter output in onUpdate
-  private InterpreterResultMessageOutput messageOutput;
-
-  private RemoteInterpreterEventClient mockRemoteEventClient;
 
   @Before
   public void setUp() {
-    mockRemoteEventClient = mock(RemoteInterpreterEventClient.class);
+    // SparkInterpreter will change the current thread's classLoader
+    Thread.currentThread().setContextClassLoader(SparkInterpreterTest.class.getClassLoader());
   }
 
   @Test
-  public void testSparkInterpreter() throws InterruptedException {
+  public void testSparkInterpreter() throws InterruptedException, InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl");
+    properties.setProperty("submarine.spark.scala.color", "false");
+    properties.setProperty("submarine.spark.test", "true");
+    properties.setProperty("submarine.spark.uiWebUrl", "fake_spark_weburl");
     // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
-
-    InterpreterContext context = InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setIntpEventClient(mockRemoteEventClient)
-        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
-        .build();
-    InterpreterContext.set(context);
-
-    interpreter = new SparkInterpreter();
-    try {
-      interpreter.open();
-    } catch (Throwable ex) {
-      ex.printStackTrace();
-    }
+    properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
 
+    interpreter = new SparkInterpreter(properties);
+
+    interpreter.open();
 
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -126,24 +103,23 @@ public class SparkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // Companion object with case class
-    result = interpreter.interpret("import scala.math._\n" +
-        "object Circle {\n" +
-        "  private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" +
-        "}\n" +
-        "case class Circle(radius: Double) {\n" +
-        "  import Circle._\n" +
-        "  def area: Double = calculateArea(radius)\n" +
-        "}\n" +
-        "\n" +
-        "val circle1 = new Circle(5.0)");
+    result = interpreter.interpret(
+            "import scala.math._\n" +
+                  "object Circle {\n" +
+                  "private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" +
+                  "}\n" +
+                  "case class Circle(radius: Double) {\n" +
+                  "  import Circle._\n" +
+                  "  def area: Double = calculateArea(radius)\n" +
+                  "}\n" +
+                  "\n" +
+                  "val circle1 = new Circle(5.0)");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // class extend
     result = interpreter.interpret("import java.util.ArrayList");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    context = getInterpreterContext();
-    context.setParagraphId("pid_1");
     result = interpreter.interpret("sc\n.range(1, 10)\n.sum");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(result.message().get(0).getData().contains("45"));
@@ -154,15 +130,15 @@ public class SparkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     result = interpreter.interpret(
-        "case class Bank(age:Integer, job:String, marital : String, edu : String, balance : Integer)\n" +
-            "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
-            "    s => Bank(s(0).toInt, \n" +
-            "            s(1).replaceAll(\"\\\"\", \"\"),\n" +
-            "            s(2).replaceAll(\"\\\"\", \"\"),\n" +
-            "            s(3).replaceAll(\"\\\"\", \"\"),\n" +
-            "            s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
-            "        )\n" +
-            ").toDF()");
+            "case class Bank(age:Integer, job:String, marital : String, edu : String, balance : Integer)\n" +
+                  "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" +
+                  "    s => Bank(s(0).toInt, \n" +
+                  "            s(1).replaceAll(\"\\\"\", \"\"),\n" +
+                  "            s(2).replaceAll(\"\\\"\", \"\"),\n" +
+                  "            s(3).replaceAll(\"\\\"\", \"\"),\n" +
+                  "            s(5).replaceAll(\"\\\"\", \"\").toInt\n" +
+                  "        )\n" +
+                  ").toDF()");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // spark version
@@ -176,11 +152,11 @@ public class SparkInterpreterTest {
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
       result = interpreter.interpret(
-          "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
-              "df.show()");
+              "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+                    "df.show()");
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains(
-          "+---+----+\n" +
+              "+---+----+\n" +
               "| _1|  _2|\n" +
               "+---+----+\n" +
               "|  1|   a|\n" +
@@ -191,11 +167,11 @@ public class SparkInterpreterTest {
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
       result = interpreter.interpret(
-          "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
-              "df.show()");
+              "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" +
+                      "df.show()");
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains(
-          "+---+----+\n" +
+              "+---+----+\n" +
               "| _1|  _2|\n" +
               "+---+----+\n" +
               "|  1|   a|\n" +
@@ -203,7 +179,7 @@ public class SparkInterpreterTest {
               "+---+----+"));
     }
 
-    // ZeppelinContext
+    // submarineContext
     result = interpreter.interpret("z.show(df)");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
@@ -213,15 +189,16 @@ public class SparkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // getProgress;
-    Thread interpretThread = new Thread() {
-      @Override
-      public void run() {
-        InterpreterResult result = null;
-        result = interpreter.interpret(
-            "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))");
-        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    Thread interpretThread = new Thread(() -> {
+      try {
+        InterpreterResult result1 = interpreter.interpret(
+                "val df = sc.parallelize(1 to 10, 5).foreach(e=>Thread.sleep(1000))");
+        assertEquals(InterpreterResult.Code.SUCCESS, result1.code());
+      } catch (InterpreterException e) {
+        e.printStackTrace();
+        fail();
       }
-    };
+    });
     interpretThread.start();
     boolean nonZeroProgress = false;
     int progress = 0;
@@ -235,16 +212,17 @@ public class SparkInterpreterTest {
     }
     assertTrue(nonZeroProgress);
 
-    interpretThread = new Thread() {
-      @Override
-      public void run() {
-        InterpreterResult result = null;
-        result = interpreter.interpret(
-            "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))");
-        assertEquals(InterpreterResult.Code.ERROR, result.code());
-        assertTrue(result.message().get(0).getData().contains("cancelled"));
+    interpretThread = new Thread(() -> {
+      try {
+        InterpreterResult result12 = interpreter.interpret(
+                "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))");
+        assertEquals(InterpreterResult.Code.ERROR, result12.code());
+        assertTrue(result12.message().get(0).getData().contains("cancelled"));
+      } catch (org.apache.submarine.interpreter.InterpreterException e) {
+        e.printStackTrace();
+        fail();
       }
-    };
+    });
 
     interpretThread.start();
     // sleep 1 second to wait for the spark job start
@@ -254,19 +232,18 @@ public class SparkInterpreterTest {
   }
 
   @Test
-  public void testDisableReplOutput() {
+  public void testDisableReplOutput() throws InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.printREPLOutput", "false");
+    properties.setProperty("submarine.spark.maxResult", "100");
+    properties.setProperty("submarine.spark.test", "true");
+    properties.setProperty("submarine.spark.printREPLOutput", "false");
     // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+    properties.setProperty("submarine.spark.scala.color", "false");
+    properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
 
-    InterpreterContext.set(getInterpreterContext());
-    interpreter = new SparkInterpreter();
+    interpreter = new SparkInterpreter(properties);
     interpreter.open();
 
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"");
@@ -281,42 +258,41 @@ public class SparkInterpreterTest {
   }
 
   @Test
-  public void testSchedulePool() {
+  public void testSchedulePool() throws InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("submarine.spark.maxResult", "100");
+    properties.setProperty("submarine.spark.test", "true");
     properties.setProperty("spark.scheduler.mode", "FAIR");
     // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+    properties.setProperty("submarine.spark.scala.color", "false");
+    properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
+    properties.setProperty("spark.scheduler.pool", "pool1");
 
-    interpreter = new SparkInterpreter();
-    InterpreterContext.set(getInterpreterContext());
+    interpreter = new SparkInterpreter(properties);
+    interpreter.setSchedulerPool("pool1");
     interpreter.open();
 
     InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
-    // pool is reset to null if user don't specify it via paragraph properties
-    result = interpreter.interpret("sc.range(1, 10).sum");
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("pool1", interpreter.getSparkContext().getLocalProperty("spark.scheduler.pool"));
   }
 
   // spark.ui.enabled: false
   @Test
-  public void testDisableSparkUI_1() {
+  public void testDisableSparkUI_1() throws InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("submarine.spark.maxResult", "100");
+    properties.setProperty("submarine.spark.test", "true");
     properties.setProperty("spark.ui.enabled", "false");
     // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+    properties.setProperty("submarine.spark.scala.color", "false");
+    properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
 
-    interpreter = new SparkInterpreter();
-    InterpreterContext.set(getInterpreterContext());
+    interpreter = new SparkInterpreter(properties);
     interpreter.open();
 
     InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
@@ -324,21 +300,20 @@ public class SparkInterpreterTest {
 
   }
 
-  // zeppelin.spark.ui.hidden: true
+  // submarine.spark.ui.hidden: true
   @Test
-  public void testDisableSparkUI_2() {
+  public void testDisableSparkUI_2() throws InterpreterException {
     Properties properties = new Properties();
     properties.setProperty("spark.master", "local");
     properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.test", "true");
-    properties.setProperty("zeppelin.spark.ui.hidden", "true");
+    properties.setProperty("submarine.spark.maxResult", "100");
+    properties.setProperty("submarine.spark.test", "true");
+    properties.setProperty("submarine.spark.ui.hidden", "true");
     // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-    properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false");
+    properties.setProperty("submarine.spark.scala.color", "false");
+    properties.setProperty("submarine.spark.deprecatedMsg.show", "false");
 
-    interpreter = new SparkInterpreter();
-    InterpreterContext.set(getInterpreterContext());
+    interpreter = new SparkInterpreter(properties);
     interpreter.open();
 
     InterpreterResult result = interpreter.interpret("sc.range(1, 10).sum");
@@ -347,41 +322,9 @@ public class SparkInterpreterTest {
 
 
   @After
-  public void tearDown() {
+  public void tearDown() throws InterpreterException {
     if (this.interpreter != null) {
       this.interpreter.close();
     }
   }
-
-  private InterpreterContext getInterpreterContext() {
-    output = "";
-    InterpreterContext context = InterpreterContext.builder()
-        .setInterpreterOut(new InterpreterOutput(null))
-        .setIntpEventClient(mockRemoteEventClient)
-        .setAngularObjectRegistry(new AngularObjectRegistry("spark", null))
-        .build();
-    context.out = new InterpreterOutput(
-        new InterpreterOutputListener() {
-          @Override
-          public void onUpdateAll(InterpreterOutput out) {
-
-          }
-
-          @Override
-          public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-            try {
-              output = out.toInterpreterResultMessage().getData();
-            } catch (IOException e) {
-              e.printStackTrace();
-            }
-          }
-
-          @Override
-          public void onUpdate(int index, InterpreterResultMessageOutput out) {
-            messageOutput = out;
-          }
-        }
-    );
-    return context;
-  }
 }
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java
new file mode 100644
index 0000000..ea3d9a1
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/java/org/apache/submarine/interpreter/SparkSqlInterpreterTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.submarine.interpreter;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkSqlInterpreterTest {
+
+  private static SparkSqlInterpreter sqlInterpreter;
+  private static SparkInterpreter sparkInterpreter;
+
+  @BeforeClass
+  public static void setUp() throws InterpreterException {
+    Properties p = new Properties();
+    p.setProperty("spark.master", "local[4]");
+    p.setProperty("spark.app.name", "test");
+    p.setProperty("submarine.spark.maxResult", "10");
+    p.setProperty("submarine.spark.concurrentSQL", "true");
+    p.setProperty("submarine.spark.sql.stacktrace", "true");
+    p.setProperty("submarine.spark.useHiveContext", "true");
+    p.setProperty("submarine.spark.deprecatedMsg.show", "false");
+
+    sqlInterpreter = new SparkSqlInterpreter(p);
+
+    InterpreterGroup intpGroup = new InterpreterGroup();
+    sparkInterpreter = new SparkInterpreter(p);
+    sparkInterpreter.setInterpreterGroup(intpGroup);
+
+    sqlInterpreter = new SparkSqlInterpreter(p);
+    sqlInterpreter.setInterpreterGroup(intpGroup);
+
+    String session = "session_1";
+    intpGroup.put(session, new LinkedList<>());
+    sparkInterpreter.addToSession(session);
+    sqlInterpreter.addToSession(session);
+
+    //SparkInterpreter will change the current thread's classLoader
+    Thread.currentThread().setContextClassLoader(SparkInterpreterTest.class.getClassLoader());
+
+    sparkInterpreter.open();
+    sqlInterpreter.open();
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterpreterException {
+    sqlInterpreter.close();
+  }
+
+  @Test
+  public void test() throws InterpreterException {
+    sparkInterpreter.interpret("case class Test(name:String, age:Int)");
+    sparkInterpreter.interpret(
+            "val test = sc.parallelize(Seq(" +
+                    " Test(\"moon\", 33)," +
+                    " Test(\"jobs\", 51)," +
+                    " Test(\"gates\", 51)," +
+                    " Test(\"park\", 34)" +
+                    "))");
+    sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")");
+
+    InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40");
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(org.apache.submarine.interpreter.InterpreterResult.Type.TABLE,
+                 ret.message().get(0).getType()
+    );
+    assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
+
+    ret = sqlInterpreter.interpret("select wrong syntax");
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertTrue(ret.message().get(0).getData().length() > 0);
+
+    assertEquals(InterpreterResult.Code.SUCCESS,
+                 sqlInterpreter.interpret("select case when name='aa' then name else name end from test")
+                         .code()
+    );
+  }
+
+  @Test
+  public void testStruct() throws InterpreterException {
+    sparkInterpreter.interpret("case class Person(name:String, age:Int)");
+    sparkInterpreter.interpret("case class People(group:String, person:Person)");
+    sparkInterpreter.interpret(
+            "val gr = sc.parallelize(Seq(" +
+                    "People(\"g1\", " +
+                    "Person(\"moon\",33)), " +
+                    "People(\"g2\", " +
+                    "Person(\"sun\",11" +
+                    "))))");
+    sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+    InterpreterResult ret = sqlInterpreter.interpret("select * from gr");
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    assertTrue(ret.message().get(0).getData().contains("[moon,33]"));
+    assertTrue(ret.message().get(0).getData().contains("[sun,11]"));
+  }
+
+
+  @Test
+  public void testMaxResults() throws InterpreterException {
+    sparkInterpreter.interpret("case class P(age:Int)");
+    sparkInterpreter.interpret(
+            "val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))");
+    sparkInterpreter.interpret("gr.toDF.registerTempTable(\"gr\")");
+
+    InterpreterResult ret = sqlInterpreter.interpret("select * from gr");
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // the number of rows is 10+1, 1 is the head of table
+    assertEquals(11, ret.message().get(0).getData().split("\n").length);
+    assertTrue(ret.message().get(1).getData().contains("alert-warning"));
+
+    // test limit local property
+    sqlInterpreter.setResultLimits(5);
+    ret = sqlInterpreter.interpret("select * from gr");
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // the number of rows is 5+1, 1 is the head of table
+    assertEquals(6, ret.message().get(0).getData().split("\n").length);
+  }
+
+  @Test
+  public void testConcurrentSQL() throws InterruptedException, InterpreterException {
+
+    sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})");
+
+
+    Thread thread1 = new Thread(() -> {
+      try {
+        InterpreterResult result = sqlInterpreter.interpret("select sleep(10)");
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      } catch (InterpreterException e) {
+        e.printStackTrace();
+      }
+    });
+
+    Thread thread2 = new Thread(() -> {
+      try {
+        InterpreterResult result = sqlInterpreter.interpret("select sleep(10)");
+        assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+      } catch (InterpreterException e) {
+        e.printStackTrace();
+      }
+    });
+
+    // start running 2 spark sql, each would sleep 10 seconds, the totally running time should
+    // be less than 20 seconds, which means they run concurrently.
+    long start = System.currentTimeMillis();
+    thread1.start();
+    thread2.start();
+    thread1.join();
+    thread2.join();
+    long end = System.currentTimeMillis();
+    assertTrue("running time must be less than 20 seconds", ((end - start) / 1000) < 20);
+
+  }
+
+  @Test
+  public void testDDL() throws InterpreterException {
+    InterpreterResult ret = sqlInterpreter.interpret("create table t1(id int, name string)");
+    assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
+    // spark 1.x will still return DataFrame with non-empty columns.
+    // org.apache.spark.sql.DataFrame = [result: string]
+
+    assertTrue(ret.message().isEmpty());
+
+    // create the same table again
+    ret = sqlInterpreter.interpret("create table t1(id int, name string)");
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(InterpreterResult.Type.TEXT,
+                 ret.message().get(0).getType()
+    );
+    assertTrue(ret.message().get(0).getData().contains("already exists"));
+
+    // invalid DDL
+    ret = sqlInterpreter.interpret("create temporary function udf1 as 'org.apache.submarine.UDF'");
+    assertEquals(InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(1, ret.message().size());
+    assertEquals(InterpreterResult.Type.TEXT,
+                 ret.message().get(0).getType()
+    );
+
+  }
+}
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml
new file mode 100644
index 0000000..e38c7be
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/hive-site.xml
@@ -0,0 +1,7 @@
+<configuration>
+  <property>
+    <name>hive.metastore.warehouse.dir</name>
+    <value>${user.home}/hive/warehouse</value>
+    <description>location of default database for the warehouse</description>
+  </property>
+</configuration>
\ No newline at end of file
diff --git a/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d7a8623
--- /dev/null
+++ b/submarine-workbench/interpreter/spark-interpreter/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+submarine.log.threshold=ALL
+submarine.root.logger=INFO, console
+
+log4j.rootLogger=${submarine.root.logger}
+
+# Logging Threshold
+log4j.threshold=${submarine.log.threshold}
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
+log4j.appender.console.encoding=UTF-8


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org