You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/06/16 05:27:58 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4844]. Support yarn interpreter launch mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 6fa79a9  [ZEPPELIN-4844]. Support yarn interpreter launch mode
6fa79a9 is described below

commit 6fa79a9fc743f2b4321ac9e8713b3380bb4d64c9
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat May 30 17:58:34 2020 +0800

    [ZEPPELIN-4844]. Support yarn interpreter launch mode
    
    ### What is this PR for?
    
    This PR is to support yarn interpreter launch mode, that means to launch the interpreter process in yarn container. So that we can mitigate the memory pressure of zeppelin server machine.
    This PR add new launcher module `yarn` for that. I manually tested shell, python, jdbc and flink interpreted.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4844
    
    ### How should this be tested?
    * CI pass and manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3786 from zjffdu/ZEPPELIN-4844 and squashes the following commits:
    
    899c4e1c0 [Jeff Zhang] [ZEPPELIN-4844]. Support yarn interpreter launch mode
    
    (cherry picked from commit df54df9e3cd6a6e4413522bae5ed66aafac8f552)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .travis.yml                                        |   9 +-
 bin/common.sh                                      |   6 +-
 bin/interpreter.sh                                 |  24 +-
 bin/zeppelin-daemon.sh                             |  15 +
 .../apache/zeppelin/flink/IPyFlinkInterpreter.java |  11 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |  16 +-
 .../apache/zeppelin/flink/PyFlinkInterpreter.java  |   8 -
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   |   3 +-
 pom.xml                                            | 565 ++++++++++++++++++++-
 zeppelin-interpreter-integration/pom.xml           | 418 ++-------------
 .../zeppelin/integration/MiniHadoopCluster.java    |   4 +-
 .../YarnInterpreterLauncherIntegrationTest.java    | 170 +++++++
 zeppelin-interpreter-shaded/pom.xml                |   4 +-
 zeppelin-interpreter/pom.xml                       |   7 +
 .../remote/RemoteInterpreterServer.java            |  35 +-
 .../zeppelin/interpreter/remote/YarnUtils.java     |  79 +++
 zeppelin-plugins/launcher/yarn/pom.xml             |  95 ++++
 .../launcher/YarnInterpreterLauncher.java          |  66 +++
 .../launcher/YarnRemoteInterpreterProcess.java     | 552 ++++++++++++++++++++
 zeppelin-plugins/notebookrepo/filesystem/pom.xml   |   7 +
 zeppelin-plugins/pom.xml                           |   1 +
 zeppelin-server/pom.xml                            |  48 +-
 .../apache/zeppelin/rest/AbstractTestRestApi.java  |   9 +
 zeppelin-zengine/pom.xml                           | 401 +--------------
 .../zeppelin/interpreter/InterpreterSetting.java   |  24 +-
 .../zeppelin/conf/ZeppelinConfigurationTest.java   |  20 +-
 .../{zeppelin-site.xml => zeppelin-test-site.xml}  |   0
 27 files changed, 1755 insertions(+), 842 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index ecb2799..05779db 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -70,13 +70,18 @@ jobs:
           packages:
             - google-chrome-stable
 
-    # Test core modules (zeppelin-interpreter,zeppelin-zengine,zeppelin-server)
+    # Test core modules (zeppelin-interpreter,zeppelin-zengine,zeppelin-server) on hadoop2
     # Several tests were excluded from this configuration due to the following issues:
     # HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470
     # After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
     - jdk: "openjdk8"
       dist: xenial
-      env: BUILD_PLUGINS="true" PYTHON="3" R="true" PROFILE="-Phelium-dev -Pexamples" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl zeppelin-server,zeppelin-web,spark/spark-dependencies,markdown,angular,shell -am" TEST_PROJECTS="-Dtests.to.exclude=**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
+      env: BUILD_PLUGINS="true" PYTHON="3" R="true" PROFILE="-Phelium-dev -Pexamples -Phadoop2" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl zeppelin-server,zeppelin-web,spark/spark-dependencies,markdown,angular,shell -am" TEST_PROJECTS="-Dtests.to.exclude=**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
+
+    # Test core modules (zeppelin-interpreter,zeppelin-zengine,zeppelin-server) on hadoop3
+    - jdk: "openjdk8"
+      dist: xenial
+      env: BUILD_PLUGINS="true" PYTHON="3" R="true" PROFILE="-Phelium-dev -Pexamples -Phadoop3" BUILD_FLAG="install -Pbuild-distr -DskipRat -DskipTests" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl zeppelin-server,zeppelin-web,spark/spark-dependencies,markdown,angular,shell -am" TEST_PROJECTS="-Dtests.to.exclude=**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
 
     # Test selenium with spark module for spark 2.3
     - jdk: "openjdk8"
diff --git a/bin/common.sh b/bin/common.sh
index 25cc148..0fbce19 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -94,8 +94,8 @@ function addEachJarInDirRecursive(){
 
 function addEachJarInDirRecursiveForIntp(){
   if [[ -d "${1}" ]]; then
-    for jar in $(find -L "${1}" -type f -name '*jar'); do
-      ZEPPELIN_INTP_CLASSPATH="$jar:$ZEPPELIN_INTP_CLASSPATH"
+    for jar in ${1}/*.jar; do
+      ZEPPELIN_INTP_CLASSPATH="$jar:${ZEPPELIN_INTP_CLASSPATH}"
     done
   fi
 }
@@ -135,7 +135,7 @@ if [[ -z "${ZEPPELIN_MEM}" ]]; then
   export ZEPPELIN_MEM="-Xms1024m -Xmx1024m"
 fi
 
-if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then
+if [[ ( -z "${ZEPPELIN_INTP_MEM}" ) && ( "${ZEPPELIN_INTERPRETER_LAUNCHER}" != "yarn" ) ]]; then
   export ZEPPELIN_INTP_MEM="-Xms1024m -Xmx2048m"
 fi
 
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 7a34425..c81ca0b 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -117,6 +117,15 @@ ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer
 
 INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
 ZEPPELIN_PID="${ZEPPELIN_PID_DIR}/zeppelin-interpreter-${INTP_GROUP_ID}-${ZEPPELIN_IDENT_STRING}-${HOSTNAME}-${PORT}.pid"
+
+if [[ "${ZEPPELIN_INTERPRETER_LAUNCHER}" == "yarn" ]]; then
+    # {LOG_DIRS} is env name in yarn container which point to the log dirs of container
+    # split the log dirs to array and use the first one
+    IFS=','
+    read -ra LOG_DIRS_ARRAY <<< "${LOG_DIRS}"
+    ZEPPELIN_LOG_DIR=${LOG_DIRS_ARRAY[0]}
+fi
+
 ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_GROUP_ID}-"
 
 if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then
@@ -236,11 +245,17 @@ elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
 
   if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
     ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
-    if ! [ -x "$(command -v hadoop)" ]; then
-      echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified.'
-      exit 1
+    # Don't use `hadoop classpath` if flink-hadoop-shaded in in lib folder
+    flink_hadoop_shaded_jar=$(find "${FLINK_HOME}/lib" -name 'flink-shaded-hadoop-*.jar')
+    if [[ ! -z "$flink_hadoop_shaded_jar" ]]; then
+      echo ""
+    else
+      if [[ ! ( -x "$(command -v hadoop)" ) && ( "${ZEPPELIN_INTERPRETER_LAUNCHER}" != "yarn" ) ]]; then
+        echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified and no flink-shaded-hadoop jar '
+        exit 1
+      fi
+      ZEPPELIN_INTP_CLASSPATH+=":`hadoop classpath`"
     fi
-    ZEPPELIN_INTP_CLASSPATH+=":`hadoop classpath`"
     export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
   else
     # autodetect HADOOP_CONF_HOME by heuristic
@@ -295,6 +310,7 @@ trap 'shutdown_hook;' SIGTERM SIGINT SIGQUIT
 function shutdown_hook() {
   local count
   count=0
+  echo "trying to shutdown..."
   while [[ "${count}" -lt 10 ]]; do
     $(kill ${pid} > /dev/null 2> /dev/null)
     if kill -0 ${pid} > /dev/null 2>&1; then
diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh
index 0ce9808..1065b14 100755
--- a/bin/zeppelin-daemon.sh
+++ b/bin/zeppelin-daemon.sh
@@ -83,6 +83,21 @@ addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
 addJarInDir "${ZEPPELIN_HOME}/zeppelin-web/target/lib"
 addJarInDir "${ZEPPELIN_HOME}/zeppelin-web-angular/target/lib"
 
+## Add hadoop jars when env USE_HADOOP is true
+if [[ "${USE_HADOOP}" == "true"  ]]; then
+  if [[ -z "${HADOOP_CONF_DIR}" ]]; then
+    echo "Please specify HADOOP_CONF_DIR if USE_HADOOP is true"
+    exit 1
+  else
+    ZEPPELIN_CLASSPATH+=":${HADOOP_CONF_DIR}"
+    if ! [ -x "$(command -v hadoop)" ]; then
+        echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified.'
+        exit 1
+    fi
+    ZEPPELIN_CLASSPATH+=":`hadoop classpath`"
+  fi
+fi
+
 CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
 
 if [[ "${ZEPPELIN_NICENESS}" = "" ]]; then
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
index 3f7e65f..a875e61 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java
@@ -108,16 +108,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter {
     flinkInterpreter.cancel(context);
     super.cancel(context);
   }
-
-  @Override
-  public void close() throws InterpreterException {
-    LOGGER.info("Close IPyFlinkInterpreter");
-    super.close();
-    if (flinkInterpreter != null) {
-      flinkInterpreter.close();
-    }
-  }
-
+  
   /**
    * Called by python process.
    */
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 3ba36d1..ff8e7a7 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -118,6 +118,7 @@ public class JobManager {
       return;
     }
 
+    boolean cancelled = false;
     try {
       String savepointDir = context.getLocalProperties().get("savepointDir");
       if (StringUtils.isBlank(savepointDir)) {
@@ -131,18 +132,23 @@ public class JobManager {
         LOGGER.info("Job {} of paragraph {} is stopped with save point path: {}",
                 jobClient.getJobID(), context.getParagraphId(), savePointPath);
       }
+      cancelled = true;
     } catch (Exception e) {
       String errorMessage = String.format("Fail to cancel job %s that is associated " +
               "with paragraph %s", jobClient.getJobID(), context.getParagraphId());
       LOGGER.warn(errorMessage, e);
       throw new InterpreterException(errorMessage, e);
     } finally {
-      FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID());
-      if (jobProgressPoller != null) {
-        jobProgressPoller.cancel();
-        jobProgressPoller.interrupt();
+      if (cancelled) {
+        LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: "
+                + context.getParagraphId());
+        FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID());
+        if (jobProgressPoller != null) {
+          jobProgressPoller.cancel();
+          jobProgressPoller.interrupt();
+        }
+        this.jobs.remove(context.getParagraphId());
       }
-      this.jobs.remove(context.getParagraphId());
     }
   }
 
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
index 40d1d11..019e6d6 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java
@@ -173,14 +173,6 @@ public class PyFlinkInterpreter extends PythonInterpreter {
   }
 
   @Override
-  public void close() throws InterpreterException {
-    super.close();
-    if (flinkInterpreter != null) {
-      flinkInterpreter.close();
-    }
-  }
-
-  @Override
   public ZeppelinContext getZeppelinContext() {
     return flinkInterpreter.getZeppelinContext();
   }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index a8728d3..cf7cc67 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -222,7 +222,8 @@ public abstract class AbstractStreamSqlJob {
       // no result anymore
       // either the job is done or an error occurred
       isRunning = false;
-      LOGGER.info("ResultRetrieval Thread is done");
+      LOGGER.info("ResultRetrieval Thread is done, isRunning={}, hasNext={}",
+              isRunning, iterator.hasNext());
       LOGGER.info("Final Result: " + buildResult());
       refreshExecutorService.shutdownNow();
     }
diff --git a/pom.xml b/pom.xml
index 6e6e232..cb81369 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,7 +142,12 @@
     <hadoop2.6.version>2.6.5</hadoop2.6.version>
     <hadoop3.0.version>3.0.3</hadoop3.0.version>
     <hadoop3.1.version>3.1.3</hadoop3.1.version>
+    <hadoop3.2.version>3.2.0</hadoop3.2.version>
+    <hadoop.version>${hadoop2.7.version}</hadoop.version>
+    
+    <hadoop.deps.scope>provided</hadoop.deps.scope>
     <quartz.scheduler.version>2.3.2</quartz.scheduler.version>
+    <jettison.version>1.4.0</jettison.version>
     <jsoup.version>1.13.1</jsoup.version>
 
     <!-- test library versions -->
@@ -338,6 +343,522 @@
         <version>${bouncycastle.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>com.amazonaws</groupId>
+        <artifactId>aws-java-sdk-s3</artifactId>
+        <version>${aws.sdk.s3.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.codehaus.jettison</groupId>
+        <artifactId>jettison</artifactId>
+        <version>${jettison.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-client</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>${hadoop.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-webdav</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jgit</groupId>
+            <artifactId>org.eclipse.jgit</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.nimbusds</groupId>
+            <artifactId>nimbus-jose-jwt</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-xml</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-configuration2</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-webapp</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-api</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>${hadoop.deps.scope}</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-webdav</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jgit</groupId>
+            <artifactId>org.eclipse.jgit</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <version>${hadoop.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-webdav</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jgit</groupId>
+            <artifactId>org.eclipse.jgit</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-common</artifactId>
+        <version>${hadoop.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-webdav</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jgit</groupId>
+            <artifactId>org.eclipse.jgit</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-beanutils</groupId>
+            <artifactId>commons-beanutils</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-configuration2</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-webapp</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.nimbusds</groupId>
+            <artifactId>nimbus-jose-jwt</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-server-tests</artifactId>
+        <version>${hadoop.version}</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-webdav</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jgit</groupId>
+            <artifactId>org.eclipse.jgit</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-jaxrs</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-xc</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-util</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP-java7</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.google.inject</groupId>
+            <artifactId>guice</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+          </exclusion>
+<!--          <exclusion>-->
+<!--            <groupId>com.google.inject.extensions</groupId>-->
+<!--            <artifactId>guice-servlet</artifactId>-->
+<!--          </exclusion>-->
+        </exclusions>
+      </dependency>
+
       <!-- Test libraries -->
       <dependency>
         <groupId>junit</groupId>
@@ -394,7 +915,6 @@
         <version>${powermock.version}</version>
         <scope>test</scope>
       </dependency>
-
     </dependencies>
   </dependencyManagement>
 
@@ -1002,6 +1522,13 @@
     </profile>
 
     <profile>
+      <id>include-hadoop</id>
+      <properties>
+        <hadoop.deps.scope>compile</hadoop.deps.scope>
+      </properties>
+    </profile>
+
+    <profile>
       <id>build-distr</id>
       <activation>
         <activeByDefault>false</activeByDefault>
@@ -1038,9 +1565,6 @@
 
     <profile>
       <id>publish-distr</id>
-      <activation>
-        <activeByDefault>false</activeByDefault>
-      </activation>
       <build>
         <plugins>
           <plugin>
@@ -1294,6 +1818,39 @@
         </plugins>
       </build>
     </profile>
+
+    <profile>
+      <id>hadoop2</id>
+      <properties>
+        <hadoop.version>${hadoop2.7.version}</hadoop.version>
+        <curator.version>2.13.0</curator.version>
+      </properties>
+    </profile>
+
+    <profile>
+      <id>hadoop3</id>
+      <properties>
+        <hadoop.version>${hadoop3.2.version}</hadoop.version>
+        <curator.version>2.13.0</curator.version>
+        <kerberos-client.version>2.0.0-M15</kerberos-client.version>
+      </properties>
+
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aws</artifactId>
+          <version>${hadoop.version}</version>
+          <scope>${hadoop.deps.scope}</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.directory.server</groupId>
+          <artifactId>kerberos-client</artifactId>
+          <version>${kerberos-client.version}</version>
+          <scope>${hadoop.deps.scope}</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+
   </profiles>
 
 </project>
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 07cfe63..1cf0341 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -40,7 +40,6 @@
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <hadoop.version>${hadoop2.7.version}</hadoop.version>
   </properties>
 
   <dependencies>
@@ -83,16 +82,8 @@
       <exclusions>
         <exclusion>
           <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-databind</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-annotations</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-core</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
 
@@ -103,16 +94,8 @@
       <exclusions>
         <exclusion>
           <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-databind</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-annotations</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-core</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
 
@@ -128,8 +111,36 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+
     <!--test libraries-->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -141,6 +152,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
@@ -172,370 +190,4 @@
     </plugins>
   </build>
 
-  <profiles>
-
-    <profile>
-      <id>hadoop2</id>
-
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-          <version>${hadoop.version}</version>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.code.findbugs</groupId>
-              <artifactId>jsr305</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-math3</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-mapper-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-core-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.code.findbugs</groupId>
-              <artifactId>jsr305</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-math3</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-yarn-server-tests</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-core-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-jaxrs</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-xc</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-mapper-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.zeppelin</groupId>
-          <artifactId>spark-interpreter</artifactId>
-          <version>${project.version}</version>
-          <scope>test</scope>
-          <exclusions>
-            <!-- It is fine to exclude zeppelin-python, because it is only used at runtime when launching interpreter -->
-            <exclusion>
-              <groupId>org.apache.zeppelin</groupId>
-              <artifactId>zeppelin-python</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.zeppelin</groupId>
-              <artifactId>r</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.protobuf</groupId>
-              <artifactId>protobuf-java</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop3</id>
-      <properties>
-        <hadoop.version>${hadoop3.1.version}</hadoop.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-api</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-runtime</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-minicluster</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
 </project>
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
index 94a85a8..eb11819 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/MiniHadoopCluster.java
@@ -65,7 +65,9 @@ public class MiniHadoopCluster {
 
     // start MiniYarnCluster
     YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf);
-    baseConfig.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", "95");
+    baseConfig.set("yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage", "98");
+    baseConfig.set("yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+
     this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2,
         1, 1);
     yarnCluster.init(baseConfig);
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java
new file mode 100644
index 0000000..445ce15
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/YarnInterpreterLauncherIntegrationTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.integration;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.zeppelin.dep.Dependency;
+import org.apache.zeppelin.interpreter.ExecutionContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterFactory;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class YarnInterpreterLauncherIntegrationTest {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(YarnInterpreterLauncherIntegrationTest.class);
+
+  private static MiniHadoopCluster hadoopCluster;
+  private static MiniZeppelin zeppelin;
+  private static InterpreterFactory interpreterFactory;
+  private static InterpreterSettingManager interpreterSettingManager;
+
+  private String hadoopHome;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    Configuration conf = new Configuration();
+    hadoopCluster = new MiniHadoopCluster(conf);
+    hadoopCluster.start();
+
+    zeppelin = new MiniZeppelin();
+    zeppelin.start();
+    interpreterFactory = zeppelin.getInterpreterFactory();
+    interpreterSettingManager = zeppelin.getInterpreterSettingManager();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (zeppelin != null) {
+      zeppelin.stop();
+    }
+    if (hadoopCluster != null) {
+      hadoopCluster.stop();
+    }
+  }
+
+  @Test
+  public void testLaunchShellInYarn() throws YarnException, InterpreterException, InterruptedException {
+    InterpreterSetting shellInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("sh");
+    shellInterpreterSetting.setProperty("zeppelin.interpreter.launcher", "yarn");
+    shellInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+
+    Interpreter shellInterpreter = interpreterFactory.getInterpreter("sh", new ExecutionContext("user1", "note1", "sh"));
+
+    InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
+    InterpreterResult interpreterResult = shellInterpreter.interpret("pwd", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertTrue(interpreterResult.toString(), interpreterResult.message().get(0).getData().contains("/usercache/"));
+
+    Thread.sleep(1000);
+    // 1 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  @Test
+  public void testJdbcPython_YarnLauncher() throws InterpreterException, YarnException, InterruptedException {
+    InterpreterSetting jdbcInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("jdbc");
+    jdbcInterpreterSetting.setProperty("default.driver", "com.mysql.jdbc.Driver");
+    jdbcInterpreterSetting.setProperty("default.url", "jdbc:mysql://localhost:3306/");
+    jdbcInterpreterSetting.setProperty("default.user", "root");
+    jdbcInterpreterSetting.setProperty("zeppelin.interpreter.launcher", "yarn");
+    jdbcInterpreterSetting.setProperty("zeppelin.interpreter.yarn.resource.memory", "512");
+    jdbcInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+
+    Dependency dependency = new Dependency("mysql:mysql-connector-java:5.1.46");
+    jdbcInterpreterSetting.setDependencies(Lists.newArrayList(dependency));
+    interpreterSettingManager.restart(jdbcInterpreterSetting.getId());
+    jdbcInterpreterSetting.waitForReady(60 * 1000);
+
+    InterpreterSetting pythonInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("python");
+    pythonInterpreterSetting.setProperty("zeppelin.interpreter.launcher", "yarn");
+    pythonInterpreterSetting.setProperty("zeppelin.interpreter.yarn.resource.memory", "512");
+    pythonInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+
+    Interpreter jdbcInterpreter = interpreterFactory.getInterpreter("jdbc", new ExecutionContext("user1", "note1", "test"));
+    assertNotNull("JdbcInterpreter is null", jdbcInterpreter);
+
+    InterpreterContext context = new InterpreterContext.Builder()
+            .setNoteId("note1")
+            .setParagraphId("paragraph_1")
+            .setAuthenticationInfo(AuthenticationInfo.ANONYMOUS)
+            .build();
+    InterpreterResult interpreterResult = jdbcInterpreter.interpret("show databases;", context);
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
+    context.getLocalProperties().put("saveAs", "table_1");
+    interpreterResult = jdbcInterpreter.interpret("SELECT 1 as c1, 2 as c2;", context);
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertEquals(1, interpreterResult.message().size());
+    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    assertEquals("c1\tc2\n1\t2\n", interpreterResult.message().get(0).getData());
+
+    // read table_1 from python interpreter
+    Interpreter pythonInterpreter = interpreterFactory.getInterpreter("python", new ExecutionContext("user1", "note1", "test"));
+    assertNotNull("PythonInterpreter is null", pythonInterpreter);
+
+    context = new InterpreterContext.Builder()
+            .setNoteId("note1")
+            .setParagraphId("paragraph_1")
+            .setAuthenticationInfo(AuthenticationInfo.ANONYMOUS)
+            .build();
+    interpreterResult = pythonInterpreter.interpret("df=z.getAsDataFrame('table_1')\nz.show(df)", context);
+    assertEquals(interpreterResult.toString(), InterpreterResult.Code.SUCCESS, interpreterResult.code());
+    assertEquals(1, interpreterResult.message().size());
+    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    assertEquals("c1\tc2\n1\t2\n", interpreterResult.message().get(0).getData());
+
+    // 2 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(2, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+
+    // sleep for 5 seconds to make sure yarn apps are finished
+    Thread.sleep(5* 1000);
+    request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(0, response.getApplicationList().size());
+  }
+}
diff --git a/zeppelin-interpreter-shaded/pom.xml b/zeppelin-interpreter-shaded/pom.xml
index e4046fb..23adea0 100644
--- a/zeppelin-interpreter-shaded/pom.xml
+++ b/zeppelin-interpreter-shaded/pom.xml
@@ -63,8 +63,6 @@
               <exclude>org.slf4j:slf4j-log4j12</exclude>
               <!-- Leave commons-logging unshaded so downstream users can configure logging. -->
               <exclude>commons-logging:commons-logging</exclude>
-              <!-- Leave commons-exec unshaded so downstream users can use ProcessLauncher. -->
-              <exclude>org.apache.commons:commons-exec</exclude>
               <!-- Leave log4j unshaded so downstream users can configure logging. -->
               <exclude>log4j:log4j</exclude>
             </excludes>
@@ -104,6 +102,8 @@
               <excludes>
                 <exclude>org/apache/zeppelin/*</exclude>
                 <exclude>org/apache/zeppelin/**/*</exclude>
+                <exclude>org/apache/hadoop/*</exclude>
+                <exclude>org/apache/hadoop/**</exclude>
                 <exclude>org/slf4j/*</exclude>
                 <exclude>org/slf4j/**/*</exclude>
                 <exclude>org/apache/commons/logging/*</exclude>
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index fba105a..c91721c 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -264,6 +264,13 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <!-- Should always use provided, yarn container (YarnInterpreterLauncher) will provide all the hadoop jars -->
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d711d3a..2d3396a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -120,6 +120,7 @@ public class RemoteInterpreterServer extends Thread
   private DistributedResourcePool resourcePool;
   private ApplicationLoader appLoader;
   private Gson gson = new Gson();
+  private String launcherEnv = System.getenv("ZEPPELIN_INTERPRETER_LAUNCHER");
 
   private String intpEventServerHost;
   private int intpEventServerPort;
@@ -241,6 +242,26 @@ public class RemoteInterpreterServer extends Thread
               }
             }
           }
+
+          if ("yarn".endsWith(launcherEnv)) {
+            try {
+              YarnUtils.register(host, port);
+              Thread thread = new Thread(() -> {
+                while(!Thread.interrupted() && server.isServing()) {
+                  YarnUtils.heartbeat();
+                  try {
+                    Thread.sleep(60 * 1000);
+                  } catch (InterruptedException e) {
+                    LOGGER.warn(e.getMessage(), e);
+                  }
+                }
+              });
+              thread.setName("RM-Heartbeat-Thread");
+              thread.start();
+            } catch (Exception e) {
+              LOGGER.error("Fail to register yarn app", e);
+            }
+          }
         }
       }).start();
     }
@@ -271,6 +292,14 @@ public class RemoteInterpreterServer extends Thread
         SchedulerFactory.singleton().destroy();
       }
 
+      if ("yarn".equals(launcherEnv)) {
+        try {
+          YarnUtils.unregister(true, "");
+        } catch (Exception e) {
+          LOGGER.error("Fail to unregister yarn app", e);
+        }
+      }
+
       server.stop();
 
       // server.stop() does not always finish server.serve() loop
@@ -310,7 +339,6 @@ public class RemoteInterpreterServer extends Thread
     }
   }
 
-
   public static void main(String[] args)
       throws TTransportException, InterruptedException, IOException {
     String zeppelinServerHost = null;
@@ -334,6 +362,7 @@ public class RemoteInterpreterServer extends Thread
       @Override
       public void handle(Signal signal) {
         try {
+          LOGGER.info("Receive TERM Signal");
           remoteInterpreterServer.shutdown();
         } catch (TException e) {
           LOGGER.error("Error on shutdown RemoteInterpreterServer", e);
@@ -342,6 +371,7 @@ public class RemoteInterpreterServer extends Thread
     });
 
     remoteInterpreterServer.join();
+    LOGGER.info("RemoteInterpreterServer thread is finished");
     System.exit(0);
   }
 
@@ -424,10 +454,11 @@ public class RemoteInterpreterServer extends Thread
       } catch (ClassNotFoundException | NoSuchMethodException | SecurityException
               | InstantiationException | IllegalAccessException
               | IllegalArgumentException | InvocationTargetException e) {
-        LOGGER.error(e.toString(), e);
+        LOGGER.error(e.getMessage(), e);
         throw new TException(e);
       }
     } catch (Exception e) {
+      LOGGER.error(e.getMessage(), e);
       throw new TException(e.getMessage(), e);
     }
   }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java
new file mode 100644
index 0000000..daa9a94
--- /dev/null
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * Singleton class which is used for register/unregister yarn app AM.
+ * The reason we put it in a separated class instead of putting it into RemoteInterpreterServer is that
+ * this class is only needed when launching interpreter process in yarn mode. If we put it into RemoteInterpreterServer
+ * We would get ClassNotFoundException when we don't launch interpreter process in yarn mode
+ */
+public class YarnUtils {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class);
+
+  private static AMRMClient amClient = AMRMClient.createAMRMClient();
+  private static Configuration conf = new YarnConfiguration();
+
+  static {
+    amClient.init(conf);
+    amClient.start();
+  }
+
+  public static void register(String host, int port) throws Exception {
+    LOGGER.info("Registering yarn app at " + host + ":" + port);
+    try {
+      amClient.registerApplicationMaster(host, port, null);
+    } catch (YarnException e) {
+      throw new Exception("Fail to register yarn app", e);
+    }
+  }
+
+  public static void heartbeat() {
+    LOGGER.info("Heartbeating to RM");
+    try {
+      amClient.allocate(1.0f);
+    } catch (Exception e) {
+      LOGGER.warn(e.getMessage(), e);
+    }
+  }
+
+  public static void unregister(boolean succcess, String diagnostic) throws Exception {
+    LOGGER.info("Unregistering yarn app");
+    try {
+      if (succcess) {
+        amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", null);
+      } else {
+        amClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnostic, null);
+      }
+    } catch (YarnException e) {
+        throw new Exception("Fail to unregister yarn app", e);
+    }
+  }
+}
diff --git a/zeppelin-plugins/launcher/yarn/pom.xml b/zeppelin-plugins/launcher/yarn/pom.xml
new file mode 100644
index 0000000..d236673
--- /dev/null
+++ b/zeppelin-plugins/launcher/yarn/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zengine-plugins-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../../../zeppelin-plugins</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>launcher-yarn</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Plugin Yarn Launcher</name>
+  <description>Launcher implementation to run interpreter on yarn</description>
+
+  <properties>
+    <plugin.name>Launcher/YarnInterpreterLauncher</plugin.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <testResources>
+      <testResource>
+        <directory>${project.basedir}/src/test/resources</directory>
+      </testResource>
+      <testResource>
+        <directory>${project.basedir}/src/main/resources</directory>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <phase>none</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+        <includes>
+          <include>**/*.*</include>
+        </includes>
+      </resource>
+    </resources>
+  </build>
+</project>
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
new file mode 100644
index 0000000..8eb4db7
--- /dev/null
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Launcher for running interpreter in yarn container.
+ */
+public class YarnInterpreterLauncher extends InterpreterLauncher {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(YarnInterpreterLauncher.class);
+
+  public YarnInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
+    super(zConf, recoveryStorage);
+  }
+
+  @Override
+  public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
+    LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
+    this.properties = context.getProperties();
+    int connectTimeout = getConnectTimeout();
+
+    return new YarnRemoteInterpreterProcess(
+            context,
+            properties,
+            buildEnvFromProperties(context),
+            connectTimeout);
+  }
+
+  protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
+    Map<String, String> env = new HashMap<>();
+    for (Object key : context.getProperties().keySet()) {
+      if (RemoteInterpreterUtils.isEnvString((String) key)) {
+        env.put((String) key, context.getProperties().getProperty((String) key));
+      }
+    }
+    env.put("INTERPRETER_GROUP_ID", context.getInterpreterGroupId());
+    env.put("ZEPPELIN_INTERPRETER_LAUNCHER", "yarn");
+    return env;
+  }
+
+}
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
new file mode 100644
index 0000000..1f14f58
--- /dev/null
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+
+/**
+ * Start interpreter in yarn container.
+ */
+public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(YarnRemoteInterpreterProcess.class);
+
+  private String host;
+  private int port = -1;
+  private ZeppelinConfiguration zConf;
+  private final InterpreterLaunchContext launchContext;
+  private final Properties properties;
+  private final Map<String, String> envs;
+  private AtomicBoolean isYarnAppRunning = new AtomicBoolean(false);
+  private String errorMessage;
+
+  /************** Hadoop related **************************/
+  private Configuration hadoopConf;
+  private FileSystem fs;
+  private FileSystem localFs;
+  private YarnClient yarnClient;
+  private ApplicationId appId;
+  private Path stagingDir;
+
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  private static final FsPermission APP_FILE_PERMISSION =
+          FsPermission.createImmutable(Short.parseShort("644", 8));
+
+  public YarnRemoteInterpreterProcess(
+          InterpreterLaunchContext launchContext,
+          Properties properties,
+          Map<String, String> envs,
+          int connectTimeout) {
+    super(connectTimeout, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
+    this.zConf = ZeppelinConfiguration.create();
+    this.launchContext = launchContext;
+    this.properties = properties;
+    this.envs = envs;
+
+    yarnClient = YarnClient.createYarnClient();
+    this.hadoopConf = new YarnConfiguration();
+
+    // Add core-site.xml and yarn-site.xml. This is for integration test where using MiniHadoopCluster.
+    if (properties.containsKey("HADOOP_CONF_DIR") &&
+            !org.apache.commons.lang3.StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
+      File hadoopConfDir = new File(properties.getProperty("HADOOP_CONF_DIR"));
+      if (hadoopConfDir.exists() && hadoopConfDir.isDirectory()) {
+        File coreSite = new File(hadoopConfDir, "core-site.xml");
+        try {
+          this.hadoopConf.addResource(coreSite.toURI().toURL());
+        } catch (MalformedURLException e) {
+          LOGGER.warn("Fail to add core-site.xml: " + coreSite.getAbsolutePath(), e);
+        }
+        File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
+        try {
+          this.hadoopConf.addResource(yarnSite.toURI().toURL());
+        } catch (MalformedURLException e) {
+          LOGGER.warn("Fail to add yarn-site.xml: " + yarnSite.getAbsolutePath(), e);
+        }
+      } else {
+        throw new RuntimeException("HADOOP_CONF_DIR: " + hadoopConfDir.getAbsolutePath() +
+                " doesn't exist or is not a directory");
+      }
+    }
+
+    yarnClient.init(this.hadoopConf);
+    yarnClient.start();
+    try {
+      this.fs = FileSystem.get(hadoopConf);
+      this.localFs = FileSystem.getLocal(hadoopConf);
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to create FileSystem", e);
+    }
+  }
+
+  @Override
+  public void processStarted(int port, String host) {
+    this.port = port;
+    this.host = host;
+  }
+
+  @Override
+  public String getErrorMessage() {
+    return this.errorMessage;
+  }
+
+  @Override
+  public String getInterpreterGroupId() {
+    return launchContext.getInterpreterGroupId();
+  }
+
+  @Override
+  public String getInterpreterSettingName() {
+    return launchContext.getInterpreterSettingName();
+  }
+
+  @Override
+  public void start(String userName) throws IOException {
+    try {
+      LOGGER.info("Submitting zeppelin-interpreter app to yarn");
+      final YarnClientApplication yarnApplication = yarnClient.createApplication();
+      final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+      this.appId = appResponse.getApplicationId();
+      ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
+      appContext = createApplicationSubmissionContext(appContext);
+      yarnClient.submitApplication(appContext);
+
+      long start = System.currentTimeMillis();
+      ApplicationReport appReport = getApplicationReport(appId);
+      while (appReport.getYarnApplicationState() != YarnApplicationState.FAILED &&
+              appReport.getYarnApplicationState() != YarnApplicationState.FINISHED &&
+              appReport.getYarnApplicationState() != YarnApplicationState.KILLED &&
+              appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
+        LOGGER.info("Wait for zeppelin interpreter yarn app to be started");
+        Thread.sleep(2000);
+        if ((System.currentTimeMillis() - start) > getConnectTimeout()) {
+          yarnClient.killApplication(this.appId);
+          throw new IOException("Launching zeppelin interpreter in yarn is time out, kill it now");
+        }
+        appReport = getApplicationReport(appId);
+      }
+
+      if (appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
+        this.errorMessage = appReport.getDiagnostics();
+        throw new Exception("Failed to submit application to YARN"
+                + ", applicationId=" + appId
+                + ", diagnostics=" + appReport.getDiagnostics());
+      }
+      isYarnAppRunning.set(true);
+
+    } catch (Exception e) {
+      LOGGER.error("Fail to launch yarn interpreter process", e);
+      throw new IOException(e);
+    } finally {
+      if (stagingDir != null) {
+        this.fs.delete(stagingDir, true);
+      }
+    }
+  }
+
+  private ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+    ApplicationReport report = yarnClient.getApplicationReport(appId);
+    if (report.getYarnApplicationState() == null) {
+      // The state can be null when the ResourceManager does not know about the app but the YARN
+      // application history server has an incomplete entry for it. Treat this scenario as if the
+      // application does not exist, since the final app status cannot be determined. This also
+      // matches the behavior for this scenario if the history server was not configured.
+      throw new ApplicationNotFoundException("YARN reports no state for application "
+              + appId);
+    }
+    return report;
+  }
+
+  private ApplicationSubmissionContext createApplicationSubmissionContext(
+          ApplicationSubmissionContext appContext) throws Exception {
+
+    setResources(appContext);
+    setPriority(appContext);
+    setQueue(appContext);
+    appContext.setApplicationId(appId);
+    setApplicationName(appContext);
+    appContext.setApplicationType("ZEPPELIN INTERPRETER");
+    appContext.setMaxAppAttempts(1);
+
+    ContainerLaunchContext amContainer = setUpAMLaunchContext();
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setCancelTokensWhenComplete(true);
+    return appContext;
+  }
+
+  private ContainerLaunchContext setUpAMLaunchContext() throws IOException {
+    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+    // Set the resources to localize
+    this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
+    Map<String, LocalResource> localResources = new HashMap<>();
+
+    File interpreterZip = createInterpreterZip();
+    Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
+    Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
+    addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
+    FileUtils.forceDelete(interpreterZip);
+
+    // TODO(zjffdu) Should not add interpreter specific logic here.
+    if (launchContext.getInterpreterSettingGroup().equals("flink")) {
+      File flinkZip = createFlinkZip();
+      srcPath = localFs.makeQualified(new Path(flinkZip.toURI()));
+      destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
+      addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "flink");
+      FileUtils.forceDelete(flinkZip);
+    }
+    amContainer.setLocalResources(localResources);
+
+    // Setup the command to run the AM
+    List<String> vargs = new ArrayList<>();
+    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/bin/interpreter.sh");
+    vargs.add("-d");
+    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/interpreter/"
+            + launchContext.getInterpreterSettingGroup());
+    vargs.add("-c");
+    vargs.add(launchContext.getIntpEventServerHost());
+    vargs.add("-p");
+    vargs.add(launchContext.getIntpEventServerPort() + "");
+    vargs.add("-r");
+    vargs.add(zConf.getInterpreterPortRange() + "");
+    vargs.add("-i");
+    vargs.add(launchContext.getInterpreterGroupId());
+    vargs.add("-l");
+    vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/" +
+            ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO.getStringValue()
+            + "/" + launchContext.getInterpreterSettingName());
+    vargs.add("-g");
+    vargs.add(launchContext.getInterpreterSettingName());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+            File.separator + ApplicationConstants.STDOUT);
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+            File.separator + ApplicationConstants.STDERR);
+
+    // Setup ContainerLaunchContext for AM container
+    amContainer.setCommands(vargs);
+
+    // pass the interpreter ENV to yarn container and also add hadoop jars to CLASSPATH
+    populateHadoopClasspath(this.envs);
+    if (this.launchContext.getInterpreterSettingGroup().equals("flink")) {
+      // Update the flink related env because the all these are different in yarn container
+      this.envs.put("FLINK_HOME", ApplicationConstants.Environment.PWD.$() + "/flink");
+      this.envs.put("FLINK_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/conf");
+      this.envs.put("FLINK_LIB_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/lib");
+      this.envs.put("FLINK_PLUGINS_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/plugins");
+    }
+    // set -Xmx
+    int memory = Integer.parseInt(
+            properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024"));
+    this.envs.put("ZEPPELIN_INTP_MEM", "-Xmx" + memory + "m");
+    amContainer.setEnvironment(this.envs);
+
+    return amContainer;
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map with any application
+   * classpath specified through the Hadoop and Yarn configurations.
+   */
+  private void populateHadoopClasspath(Map<String, String> envs) {
+    List<String> yarnClassPath = Lists.newArrayList(getYarnAppClasspath());
+    List<String> mrClassPath = Lists.newArrayList(getMRAppClasspath());
+    yarnClassPath.addAll(mrClassPath);
+    LOGGER.info("Adding hadoop classpath: " + org.apache.commons.lang3.StringUtils.join(yarnClassPath, ":"));
+    for (String path : yarnClassPath) {
+      String newValue = path;
+      if (envs.containsKey(ApplicationConstants.Environment.CLASSPATH.name())) {
+        newValue = envs.get(ApplicationConstants.Environment.CLASSPATH.name()) +
+                ApplicationConstants.CLASS_PATH_SEPARATOR + newValue;
+      }
+      envs.put(ApplicationConstants.Environment.CLASSPATH.name(), newValue);
+    }
+  }
+
+  private String[] getYarnAppClasspath() {
+    String[] classpaths = hadoopConf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+    if (classpaths == null || classpaths.length == 0) {
+      return getDefaultYarnApplicationClasspath();
+    } else {
+      return classpaths;
+    }
+  }
+
+  private String[] getMRAppClasspath() {
+    String[] classpaths = hadoopConf.getStrings("mapreduce.application.classpath");
+    if (classpaths == null || classpaths.length == 0) {
+      return getDefaultMRApplicationClasspath();
+    } else {
+      return classpaths;
+    }
+  }
+
+  private String[] getDefaultYarnApplicationClasspath() {
+    return YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH;
+  }
+
+  private String[] getDefaultMRApplicationClasspath() {
+    return StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
+  }
+
+  private void setResources(ApplicationSubmissionContext appContext) {
+    int memory = Integer.parseInt(
+            properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024"));
+    int memoryOverHead = Integer.parseInt(
+            properties.getProperty("zeppelin.interpreter.yarn.resource.memoryOverhead", "384"));
+    if (memoryOverHead < memory * 0.1) {
+      memoryOverHead = 384;
+    }
+    int cores = Integer.parseInt(
+            properties.getProperty("zeppelin.interpreter.yarn.resource.cores", "1"));
+    final Resource resource = Resource.newInstance(memory + memoryOverHead, cores);
+    appContext.setResource(resource);
+  }
+
+  private void setPriority(ApplicationSubmissionContext appContext) {
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(1);
+    appContext.setPriority(pri);
+  }
+
+  private void setQueue(ApplicationSubmissionContext appContext) {
+    String queue = properties.getProperty("zeppelin.interpreter.yarn.queue", "default");
+    appContext.setQueue(queue);
+  }
+
+  private void setApplicationName(ApplicationSubmissionContext appContext) {
+    appContext.setApplicationName("Zeppelin Interpreter " + launchContext.getInterpreterGroupId());
+  }
+
+  /**
+   * @param zos
+   * @param srcFile
+   * @param parentDirectoryName
+   * @throws IOException
+   */
+  private void addFileToZipStream(ZipOutputStream zos,
+                                  File srcFile,
+                                  String parentDirectoryName) throws IOException {
+    if (srcFile == null || !srcFile.exists()) {
+      return;
+    }
+
+    String zipEntryName = srcFile.getName();
+    if (parentDirectoryName != null && !parentDirectoryName.isEmpty()) {
+      zipEntryName = parentDirectoryName + "/" + srcFile.getName();
+    }
+
+    if (srcFile.isDirectory()) {
+      for (File file : srcFile.listFiles()) {
+        addFileToZipStream(zos, file, zipEntryName);
+      }
+    } else {
+      zos.putNextEntry(new ZipEntry(zipEntryName));
+      Files.copy(srcFile, zos);
+      zos.closeEntry();
+    }
+  }
+
+  /**
+   *
+   * Create zip file to interpreter.
+   * The contents are all the stuff under ZEPPELIN_HOME/interpreter/{interpreter_name}
+   * @return
+   * @throws IOException
+   */
+  private File createInterpreterZip() throws IOException {
+    File interpreterArchive = File.createTempFile("zeppelin_interpreter_", ".zip", Files.createTempDir());
+    ZipOutputStream interpreterZipStream = new ZipOutputStream(new FileOutputStream(interpreterArchive));
+    interpreterZipStream.setLevel(0);
+
+    String zeppelinHomeEnv = System.getenv("ZEPPELIN_HOME");
+    if (org.apache.commons.lang3.StringUtils.isBlank(zeppelinHomeEnv)) {
+      throw new IOException("ZEPPELIN_HOME is not specified");
+    }
+    File zeppelinHome = new File(zeppelinHomeEnv);
+    File binDir = new File(zeppelinHome, "bin");
+    addFileToZipStream(interpreterZipStream, binDir, null);
+
+    File confDir = new File(zeppelinHome, "conf");
+    addFileToZipStream(interpreterZipStream, confDir, null);
+
+    File interpreterDir = new File(zeppelinHome, "interpreter/" + launchContext.getInterpreterSettingGroup());
+    addFileToZipStream(interpreterZipStream, interpreterDir, "interpreter");
+
+    File localRepoDir = new File(zConf.getInterpreterLocalRepoPath() + "/"
+            + launchContext.getInterpreterSettingName());
+    if (localRepoDir.exists() && localRepoDir.isDirectory()) {
+      LOGGER.debug("Adding localRepoDir {} to interpreter zip: ", localRepoDir.getAbsolutePath());
+      addFileToZipStream(interpreterZipStream, localRepoDir, "local-repo");
+    }
+
+    // add zeppelin-interpreter-shaded jar
+    File[] interpreterShadedFiles = new File(zeppelinHome, "interpreter").listFiles(
+            file -> file.getName().startsWith("zeppelin-interpreter-shaded")
+                    && file.getName().endsWith(".jar"));
+    if (interpreterShadedFiles.length == 0) {
+      throw new IOException("No zeppelin-interpreter-shaded jar found under " +
+              zeppelinHome.getAbsolutePath() + "/interpreter");
+    }
+    if (interpreterShadedFiles.length > 1) {
+      throw new IOException("More than 1 zeppelin-interpreter-shaded jars found under "
+              + zeppelinHome.getAbsolutePath() + "/interpreter");
+    }
+    addFileToZipStream(interpreterZipStream, interpreterShadedFiles[0], "interpreter");
+
+    interpreterZipStream.flush();
+    interpreterZipStream.close();
+    return interpreterArchive;
+  }
+
+  private File createFlinkZip() throws IOException {
+    File flinkArchive = File.createTempFile("flink_", ".zip", Files.createTempDir());
+    ZipOutputStream flinkZipStream = new ZipOutputStream(new FileOutputStream(flinkArchive));
+    flinkZipStream.setLevel(0);
+
+    String flinkHomeEnv = envs.get("FLINK_HOME");
+    File flinkHome = new File(flinkHomeEnv);
+    if (!flinkHome.exists() || !flinkHome.isDirectory()) {
+      throw new IOException("FLINK_HOME " + flinkHome.getAbsolutePath() +
+              " doesn't exist or is not a directory.");
+    }
+    for (File file : flinkHome.listFiles()) {
+      addFileToZipStream(flinkZipStream, file, null);
+    }
+
+    flinkZipStream.flush();
+    flinkZipStream.close();
+    return flinkArchive;
+  }
+
+  private Path copyFileToRemote(
+          Path destDir,
+          Path srcPath,
+          Short replication) throws IOException {
+    FileSystem destFs = destDir.getFileSystem(hadoopConf);
+    FileSystem srcFs = srcPath.getFileSystem(hadoopConf);
+
+    Path destPath = new Path(destDir, srcPath.getName());
+    LOGGER.info("Uploading resource " + srcPath + " to " + destPath);
+    FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf);
+    destFs.setReplication(destPath, replication);
+    destFs.setPermission(destPath, APP_FILE_PERMISSION);
+
+    return destPath;
+  }
+
+  private void addResource(
+          FileSystem fs,
+          Path destPath,
+          Map<String, LocalResource> localResources,
+          LocalResourceType resourceType,
+          String link) throws IOException {
+
+    FileStatus destStatus = fs.getFileStatus(destPath);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+    amJarRsrc.setType(resourceType);
+    amJarRsrc.setVisibility(LocalResourceVisibility.PUBLIC);
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath));
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    localResources.put(link, amJarRsrc);
+  }
+
+  @Override
+  public void stop() {
+    if (isRunning()) {
+      LOGGER.info("Kill interpreter process");
+      try {
+        callRemoteFunction(client -> {
+          client.shutdown();
+          return null;
+        });
+      } catch (Exception e) {
+        LOGGER.warn("ignore the exception when shutting down", e);
+      }
+
+      // Shutdown connection
+      shutdown();
+    }
+
+    yarnClient.stop();
+    LOGGER.info("Remote process terminated");
+  }
+
+  @Override
+  public String getHost() {
+    return this.host;
+  }
+
+  @Override
+  public int getPort() {
+    return this.port;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return isYarnAppRunning.get();
+  }
+}
diff --git a/zeppelin-plugins/notebookrepo/filesystem/pom.xml b/zeppelin-plugins/notebookrepo/filesystem/pom.xml
index f7f39a4..a3f1d34 100644
--- a/zeppelin-plugins/notebookrepo/filesystem/pom.xml
+++ b/zeppelin-plugins/notebookrepo/filesystem/pom.xml
@@ -40,6 +40,13 @@
         <plugin.name>NotebookRepo/FileSystemNotebookRepo</plugin.name>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+        </dependency>
+    </dependencies>
+
     <build>
         <plugins>
             <plugin>
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index 5629f16..5e5168d 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -48,6 +48,7 @@
         <module>launcher/k8s-standard</module>
         <module>launcher/cluster</module>
         <module>launcher/docker</module>
+        <module>launcher/yarn</module>
         <module>launcher/flink</module>
     </modules>
 
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 4b25553..b53b502 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -42,7 +42,6 @@
     <javax.ws.rsapi.version>2.1</javax.ws.rsapi.version>
     <libpam4j.version>1.11</libpam4j.version>
     <jna.version>4.1.0</jna.version>
-    <commons.configuration2.version>2.2</commons.configuration2.version>
 
     <!--test library versions-->
     <selenium.java.version>2.48.2</selenium.java.version>
@@ -120,10 +119,6 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>com.fasterxml.jackson.core</groupId>
-          <artifactId>jackson-core</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -135,6 +130,10 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-annotations</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -180,19 +179,6 @@
       <version>1.9.4</version>
     </dependency>
 
-    <!-- Needed for string interpolation in shiro.ini -->
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-configuration2</artifactId>
-      <version>${commons.configuration2.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.commons</groupId>
-          <artifactId>commons-lang3</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.shiro</groupId>
       <artifactId>shiro-web</artifactId>
@@ -258,6 +244,30 @@
       <version>${quartz.scheduler.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-zengine</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!--test libraries-->
     <dependency>
       <groupId>junit</groupId>
@@ -370,7 +380,7 @@
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration combine.children="append">
-          <argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
+          <argLine>-Xmx3g -Xms1g -Dfile.encoding=UTF-8</argLine>
           <excludes>
             <exclude>${tests.to.exclude}</exclude>
           </excludes>
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 6552d0f..5658c01 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -35,6 +35,8 @@ import org.apache.commons.httpclient.methods.PutMethod;
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.apache.zeppelin.interpreter.integration.DownloadUtils;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.plugin.PluginManager;
 import org.apache.zeppelin.utils.TestUtils;
@@ -200,6 +202,7 @@ public abstract class AbstractTestRestApi {
       System.setProperty(
           ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT.getVarName(),
           "spark");
+      
       notebookDir = new File(zeppelinHome.getAbsolutePath() + "/notebook_" + testClassName);
       if (cleanData) {
         FileUtils.deleteDirectory(notebookDir);
@@ -256,6 +259,12 @@ public abstract class AbstractTestRestApi {
       }
 
       LOG.info("Zeppelin Server is started.");
+
+      // set up spark interpreter
+      String sparkHome = DownloadUtils.downloadSpark("2.4.4");
+      InterpreterSettingManager interpreterSettingManager = TestUtils.getInstance(InterpreterSettingManager.class);
+      InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+      interpreterSetting.setProperty("SPARK_HOME", sparkHome);
     }
   }
 
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 4ef93c6..9977d68 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -37,17 +37,14 @@
 
   <properties>
     <!--library versions-->
-    <hadoop.version>${hadoop2.7.version}</hadoop.version>
     <jackrabbit.webdav.version>1.5.2</jackrabbit.webdav.version>
     <lucene.version>5.3.1</lucene.version>
     <org.reflections.version>0.9.8</org.reflections.version>
     <xml.apis.version>1.4.01</xml.apis.version>
     <frontend.maven.plugin.version>1.3</frontend.maven.plugin.version>
-    <aws.sdk.s3.version>1.11.736</aws.sdk.s3.version>
     <commons.vfs2.version>2.2</commons.vfs2.version>
     <eclipse.jgit.version>4.5.4.201711221230-r</eclipse.jgit.version>
-    <jettison.version>1.4.0</jettison.version>
-    <kerberos-client.version>2.0.0-M15</kerberos-client.version>
+    <commons.configuration2.version>2.2</commons.configuration2.version>
     <!--test library versions-->
     <google.truth.version>0.27</google.truth.version>
     <google.testing.nio.version>0.32.0-alpha</google.testing.nio.version>
@@ -207,6 +204,19 @@
       </exclusions>
     </dependency>
 
+    <!-- Needed for string interpolation in shiro.ini -->
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-configuration2</artifactId>
+      <version>${commons.configuration2.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-lang3</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <dependency>
       <groupId>org.eclipse.jgit</groupId>
       <artifactId>org.eclipse.jgit</artifactId>
@@ -214,6 +224,16 @@
     </dependency>
 
     <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -309,377 +329,4 @@
       </plugin>
     </plugins>
   </build>
-
-  <profiles>
-
-    <profile>
-      <id>hadoop2</id>
-
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
-
-      <dependencies>
-        <dependency>
-          <groupId>com.amazonaws</groupId>
-          <artifactId>aws-java-sdk-s3</artifactId>
-          <version>${aws.sdk.s3.version}</version>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-          <version>${hadoop.version}</version>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty-all</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.code.findbugs</groupId>
-              <artifactId>jsr305</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-math3</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-          <dependency>
-              <groupId>org.apache.hadoop</groupId>
-              <artifactId>hadoop-common</artifactId>
-              <version>${hadoop.version}</version>
-              <classifier>tests</classifier>
-              <scope>test</scope>
-              <exclusions>
-                  <exclusion>
-                      <groupId>com.sun.jersey</groupId>
-                      <artifactId>jersey-core</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>com.sun.jersey</groupId>
-                      <artifactId>jersey-json</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>com.sun.jersey</groupId>
-                      <artifactId>jersey-client</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>com.sun.jersey</groupId>
-                      <artifactId>jersey-server</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>javax.servlet</groupId>
-                      <artifactId>servlet-api</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.apache.avro</groupId>
-                      <artifactId>avro</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.apache.jackrabbit</groupId>
-                      <artifactId>jackrabbit-webdav</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>io.netty</groupId>
-                      <artifactId>netty</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>commons-httpclient</groupId>
-                      <artifactId>commons-httpclient</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.eclipse.jgit</groupId>
-                      <artifactId>org.eclipse.jgit</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>com.jcraft</groupId>
-                      <artifactId>jsch</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.apache.commons</groupId>
-                      <artifactId>commons-compress</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>xml-apis</groupId>
-                      <artifactId>xml-apis</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>xerces</groupId>
-                      <artifactId>xercesImpl</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.codehaus.jackson</groupId>
-                      <artifactId>jackson-mapper-asl</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>org.codehaus.jackson</groupId>
-                      <artifactId>jackson-core-asl</artifactId>
-                  </exclusion>
-                  <exclusion>
-                      <groupId>com.google.guava</groupId>
-                      <artifactId>guava</artifactId>
-                  </exclusion>
-                <exclusion>
-                  <groupId>com.google.code.findbugs</groupId>
-                  <artifactId>jsr305</artifactId>
-                </exclusion>
-                <exclusion>
-                  <groupId>org.apache.commons</groupId>
-                  <artifactId>commons-math3</artifactId>
-                </exclusion>
-              </exclusions>
-          </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-json</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty-all</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-yarn-server-tests</artifactId>
-          <version>${hadoop.version}</version>
-          <classifier>tests</classifier>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-core</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-client</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.sun.jersey</groupId>
-              <artifactId>jersey-server</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>javax.servlet</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.avro</groupId>
-              <artifactId>avro</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.jackrabbit</groupId>
-              <artifactId>jackrabbit-webdav</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>commons-httpclient</groupId>
-              <artifactId>commons-httpclient</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.eclipse.jgit</groupId>
-              <artifactId>org.eclipse.jgit</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.jcraft</groupId>
-              <artifactId>jsch</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.apache.commons</groupId>
-              <artifactId>commons-compress</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xml-apis</groupId>
-              <artifactId>xml-apis</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>xerces</groupId>
-              <artifactId>xercesImpl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-core-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-jaxrs</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-xc</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>org.codehaus.jackson</groupId>
-              <artifactId>jackson-mapper-asl</artifactId>
-            </exclusion>
-            <exclusion>
-              <groupId>com.google.guava</groupId>
-              <artifactId>guava</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-      </dependencies>
-    </profile>
-
-    <profile>
-      <id>hadoop3</id>
-      <properties>
-        <hadoop.version>3.0.0</hadoop.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-api</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-runtime</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client-minicluster</artifactId>
-          <version>${hadoop.version}</version>
-          <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.codehaus.jettison</groupId>
-          <artifactId>jettison</artifactId>
-          <version>${jettison.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-aws</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-		<dependency>
-		  <groupId>org.apache.directory.server</groupId>
-		  <artifactId>kerberos-client</artifactId>
-		  <version>${kerberos-client.version}</version>
-	    </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
 </project>
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 6b3eba1..e5ab0b2 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -141,10 +141,6 @@ public class InterpreterSetting {
 
   private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
 
-  // TODO(zjffdu) ShellScriptLauncher is the only launcher implemention for now. It could be other
-  // launcher in future when we have other launcher implementation. e.g. third party launcher
-  // service like livy
-  private transient InterpreterLauncher launcher;
   private transient LifecycleManager lifecycleManager;
   private transient RecoveryStorage recoveryStorage;
   private transient RemoteInterpreterEventServer interpreterEventServer;
@@ -309,9 +305,9 @@ public class InterpreterSetting {
     this.conf = o.getConf();
   }
 
-  private void createLauncher() throws IOException {
-    this.launcher = PluginManager.get().loadInterpreterLauncher(
-        getLauncherPlugin(), recoveryStorage);
+  private InterpreterLauncher createLauncher(Properties properties) throws IOException {
+    return PluginManager.get().loadInterpreterLauncher(
+        getLauncherPlugin(properties), recoveryStorage);
   }
 
   public AngularObjectRegistryListener getAngularObjectRegistryListener() {
@@ -783,7 +779,7 @@ public class InterpreterSetting {
     this.interpreterRunner = interpreterRunner;
   }
 
-  public String getLauncherPlugin() {
+  public String getLauncherPlugin(Properties properties) {
     if (isRunningOnKubernetes()) {
       return "K8sStandardInterpreterLauncher";
     } else if (isRunningOnCluster()) {
@@ -791,11 +787,19 @@ public class InterpreterSetting {
     } if (isRunningOnDocker()) {
       return "DockerInterpreterLauncher";
     } else {
+      String launcher = properties.getProperty("zeppelin.interpreter.launcher");
+      LOGGER.debug("zeppelin.interpreter.launcher: " + launcher);
       if (group.equals("spark")) {
         return "SparkInterpreterLauncher";
       } else if (group.equals("flink")) {
+        if ("yarn".equals(launcher)) {
+          return "YarnInterpreterLauncher";
+        }
         return "FlinkInterpreterLauncher";
       } else {
+        if ("yarn".equals(launcher)) {
+          return "YarnInterpreterLauncher";
+        }
         return "StandardInterpreterLauncher";
       }
     }
@@ -860,9 +864,7 @@ public class InterpreterSetting {
                                                                  String userName,
                                                                  Properties properties)
       throws IOException {
-    if (launcher == null) {
-      createLauncher();
-    }
+    InterpreterLauncher launcher = createLauncher(properties);
     InterpreterLaunchContext launchContext = new
         InterpreterLaunchContext(properties, option, interpreterRunner, userName,
         interpreterGroupId, id, group, name, interpreterEventServer.getPort(), interpreterEventServer.getHost());
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java
index 001860b..08e8255 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java
@@ -58,7 +58,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void getAllowedOriginsNoneTest() throws MalformedURLException, ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     List<String> origins = conf.getAllowedOrigins();
     Assert.assertEquals(1, origins.size());
   }
@@ -66,7 +66,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isWindowsPathTestTrue() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Boolean isIt = conf.isWindowsPath("c:\\test\\file.txt");
     Assert.assertTrue(isIt);
   }
@@ -74,7 +74,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isWindowsPathTestFalse() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Boolean isIt = conf.isWindowsPath("~/test/file.xml");
     Assert.assertFalse(isIt);
   }
@@ -82,7 +82,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isPathWithSchemeTestTrue() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Boolean isIt = conf.isPathWithScheme("hdfs://hadoop.example.com/zeppelin/notebook");
     Assert.assertTrue(isIt);
   }
@@ -90,7 +90,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isPathWithSchemeTestFalse() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Boolean isIt = conf.isPathWithScheme("~/test/file.xml");
     Assert.assertFalse(isIt);
   }
@@ -98,14 +98,14 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isPathWithInvalidSchemeTest() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Boolean isIt = conf.isPathWithScheme("c:\\test\\file.txt");
     Assert.assertFalse(isIt);
   }
 
   @Test
   public void getNotebookDirTest() throws ConfigurationException {
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     String notebookLocation = conf.getNotebookDir();
     assertTrue(notebookLocation.endsWith("notebook"));
   }
@@ -113,7 +113,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void isNotebookPublicTest() throws ConfigurationException {
 
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     boolean isIt = conf.isNotebookPublic();
     assertTrue(isIt);
   }
@@ -121,7 +121,7 @@ public class ZeppelinConfigurationTest {
   @Test
   public void getPathTest() throws ConfigurationException {
     System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), "/usr/lib/zeppelin");
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     Assert.assertEquals("/usr/lib/zeppelin", conf.getZeppelinHome());
     Assert.assertEquals("/usr/lib/zeppelin/conf", conf.getConfDir());
   }
@@ -130,7 +130,7 @@ public class ZeppelinConfigurationTest {
   public void getConfigFSPath() throws ConfigurationException {
     System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), "/usr/lib/zeppelin");
     System.setProperty(ConfVars.ZEPPELIN_CONFIG_FS_DIR.getVarName(), "conf");
-    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-site.xml"));
+    ZeppelinConfiguration conf = new ZeppelinConfiguration(this.getClass().getResource("/zeppelin-test-site.xml"));
     assertEquals("/usr/lib/zeppelin/conf", conf.getConfigFSDir());
 
     System.setProperty(ConfVars.ZEPPELIN_CONFIG_STORAGE_CLASS.getVarName(), "org.apache.zeppelin.storage.FileSystemConfigStorage");
diff --git a/zeppelin-zengine/src/test/resources/zeppelin-site.xml b/zeppelin-zengine/src/test/resources/zeppelin-test-site.xml
similarity index 100%
rename from zeppelin-zengine/src/test/resources/zeppelin-site.xml
rename to zeppelin-zengine/src/test/resources/zeppelin-test-site.xml