You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/18 22:41:01 UTC

[2/2] zeppelin git commit: ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter

ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter

### What is this PR for?
This is the first version for supporting yarn-cluster of `SparkInterpreter`.   I just delegate all the function to `spark-submit` as yarn-cluster is natively supported by spark, we don't need to reinvent the wheel. But there's still improvement to be done in future, e.g. I put some spark specific logic in `InterpreterSetting` which is not a good practise.  I plan to improve it when I refactor the `Interpreter` class (ZEPPELIN-2685).

Besides that, I also add `MiniHadoopCluster` & `MiniZeppelin` which help for the integration test of yarn-client & yarn-cluster mode, otherwise I have to manually verify yarn-client & yarn-cluster mode which would easily cause regression issue in future.

To be noticed:

* SPARK_HOME must be specified for yarn-cluster mode
* HADOOP_CONF_DIR must be specified for yarn-cluster mode

### What type of PR is it?
[Feature]

### Todos
* [ ] - Task

### What is the Jira issue?
https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-2898

### How should this be tested?
System test is added in `SparkInterpreterIT`.

### Questions:
* Does the licenses files need update?  No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #2577 from zjffdu/ZEPPELIN-2898 and squashes the following commits:

9da7c4b [Jeff Zhang] ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5d715109
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5d715109
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5d715109

Branch: refs/heads/master
Commit: 5d7151097e171c5ec9f22f150ac4ce92b5512013
Parents: d25639c
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Sep 4 21:54:56 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Sep 19 06:40:51 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 bin/common.sh                                   |   6 +-
 bin/interpreter.sh                              |  12 +-
 conf/log4j_yarn_cluster.properties              |  23 ++
 docs/interpreter/spark.md                       |   2 +-
 .../zeppelin/python/IPythonInterpreter.java     |  44 +--
 .../zeppelin/spark/IPySparkInterpreter.java     |  11 +-
 .../zeppelin/spark/PySparkInterpreter.java      |  22 +-
 .../org/apache/zeppelin/spark/PythonUtils.java  |  96 +++++++
 .../apache/zeppelin/spark/SparkInterpreter.java |  99 +------
 .../src/main/resources/interpreter-setting.json |   4 +-
 .../zeppelin/spark/IPySparkInterpreterTest.java |   1 +
 .../spark/PySparkInterpreterMatplotlibTest.java |   2 +-
 .../zeppelin/spark/PySparkInterpreterTest.java  |   2 +-
 .../interpreter/InterpreterContext.java         |  47 ++-
 zeppelin-server/pom.xml                         |  12 +
 .../zeppelin/integration/AuthenticationIT.java  |   2 +-
 .../integration/InterpreterModeActionsIT.java   |   2 +-
 .../integration/PersonalizeActionsIT.java       |   2 +-
 .../zeppelin/rest/AbstractTestRestApi.java      |   8 +-
 zeppelin-zengine/pom.xml                        | 286 ++++++++++++++++++-
 .../zeppelin/conf/ZeppelinConfiguration.java    |   2 +-
 .../interpreter/InterpreterSetting.java         | 131 ++++++++-
 .../interpreter/InterpreterSettingManager.java  |  15 +-
 .../remote/RemoteInterpreterEventPoller.java    |   6 +-
 .../remote/RemoteInterpreterManagedProcess.java |   2 +
 .../remote/RemoteInterpreterProcess.java        |   2 +-
 .../interpreter/AbstractInterpreterTest.java    |  23 +-
 .../zeppelin/interpreter/MiniHadoopCluster.java | 114 ++++++++
 .../zeppelin/interpreter/MiniZeppelin.java      |  68 +++++
 .../interpreter/SparkInterpreterModeTest.java   | 147 ++++++++++
 .../notebook/repo/VFSNotebookRepoTest.java      |   4 +-
 32 files changed, 1007 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 0d9e72a..4495aa4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,7 +66,8 @@ matrix:
     # Several tests were excluded from this configuration due to the following issues:
     # HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470
     # After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
-    - jdk: "oraclejdk8"
+    - sudo: required
+      jdk: "oraclejdk8"
       dist: precise
       env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
 
@@ -143,6 +144,7 @@ before_script:
   - if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
   - if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-$LIVY_VER-bin; fi
   - if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
+  - export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
   - echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
   - echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
   - tail conf/zeppelin-env.sh

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/common.sh
----------------------------------------------------------------------
diff --git a/bin/common.sh b/bin/common.sh
index c7100c7..d425cb1 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -122,7 +122,11 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
 export JAVA_OPTS
 
 JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
-JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
+if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
+    JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
+else
+    JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
+fi
 export JAVA_INTP_OPTS
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index fd93a06..5245e25 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -143,7 +143,12 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
       export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
     fi
     unset PYSPARKPATH
+    export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
+  fi
 
+  if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+    ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+  else
     # autodetect HADOOP_CONF_HOME by heuristic
     if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
       if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
@@ -152,13 +157,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
         export HADOOP_CONF_DIR="/etc/hadoop/conf"
       fi
     fi
-
-    if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
-      ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
-    fi
-
-    export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
   fi
+
 elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
   if [[ -n "${HBASE_CONF_DIR}" ]]; then
     ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/conf/log4j_yarn_cluster.properties
----------------------------------------------------------------------
diff --git a/conf/log4j_yarn_cluster.properties b/conf/log4j_yarn_cluster.properties
new file mode 100644
index 0000000..532fc5e
--- /dev/null
+++ b/conf/log4j_yarn_cluster.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/docs/interpreter/spark.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 780c60a..be5b3e5 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -424,7 +424,7 @@ It creates separated SparkContext per each notebook in `isolated` mode.
 ## IPython support
 
 By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation.
-If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
+If you don't want to use IPython, then you can set `zeppelin.pyspark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
 [Python Interpreter](python.html)
 
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 9b6f730..193c343 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -78,6 +78,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
   private long ipythonLaunchTimeout;
   private String additionalPythonPath;
   private String additionalPythonInitFile;
+  private boolean useBuiltinPy4j = true;
 
   private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
 
@@ -92,6 +93,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
    * @param additionalPythonPath
    */
   public void setAdditionalPythonPath(String additionalPythonPath) {
+    LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
     this.additionalPythonPath = additionalPythonPath;
   }
 
@@ -105,6 +107,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
     this.additionalPythonInitFile = additionalPythonInitFile;
   }
 
+  public void setAddBulitinPy4j(boolean add) {
+    this.useBuiltinPy4j = add;
+  }
+
   @Override
   public void open() {
     try {
@@ -113,6 +119,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
         return;
       }
       pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
+      LOGGER.info("Python Exec: " + pythonExecutable);
       ipythonLaunchTimeout = Long.parseLong(
           getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
       this.zeppelinContext = new PythonZeppelinContext(
@@ -218,29 +225,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
     watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
     executor.setWatchdog(watchDog);
 
-    String py4jLibPath = null;
-    if (System.getenv("ZEPPELIN_HOME") != null) {
-      py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
-          + PythonInterpreter.ZEPPELIN_PY4JPATH;
-    } else {
-      Path workingPath = Paths.get("..").toAbsolutePath();
-      py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
-    }
-    if (additionalPythonPath != null) {
-      // put the py4j at the end, because additionalPythonPath may already contain py4j.
-      // e.g. PySparkInterpreter
-      additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
-    } else {
-      additionalPythonPath = py4jLibPath;
+    if (useBuiltinPy4j) {
+      String py4jLibPath = null;
+      if (System.getenv("ZEPPELIN_HOME") != null) {
+        py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+            + PythonInterpreter.ZEPPELIN_PY4JPATH;
+      } else {
+        Path workingPath = Paths.get("..").toAbsolutePath();
+        py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
+      }
+      if (additionalPythonPath != null) {
+        // put the py4j at the end, because additionalPythonPath may already contain py4j.
+        // e.g. PySparkInterpreter
+        additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
+      } else {
+        additionalPythonPath = py4jLibPath;
+      }
     }
+
     Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
     if (envs.containsKey("PYTHONPATH")) {
-      envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
+      if (additionalPythonPath != null) {
+        envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
+      }
     } else {
       envs.put("PYTHONPATH", additionalPythonPath);
     }
 
-    LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH"));
+    LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH"));
     executor.execute(cmd, envs, this);
 
     // wait until IPython kernel is started or timeout

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index f1b1435..56b3823 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -44,12 +44,15 @@ public class IPySparkInterpreter extends IPythonInterpreter {
 
   @Override
   public void open() {
-    getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
+    property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
     sparkInterpreter = getSparkInterpreter();
     SparkConf conf = sparkInterpreter.getSparkContext().getConf();
-    String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
-        ":../interpreter/lib/python";
-    setAdditionalPythonPath(additionalPythonPath);
+    // only set PYTHONPATH in local or yarn-client mode.
+    // yarn-cluster will setup PYTHONPATH automatically.
+    if (!conf.get("spark.submit.deployMode").equals("cluster")) {
+      setAdditionalPythonPath(PythonUtils.sparkPythonPath());
+      setAddBulitinPy4j(false);
+    }
     setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
     super.open();
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index e65df22..dd32059 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -115,7 +115,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
   public void open() {
     // try IPySparkInterpreter first
     iPySparkInterpreter = getIPySparkInterpreter();
-    if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") &&
+    if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
         iPySparkInterpreter.checkIPythonPrerequisite()) {
       try {
         iPySparkInterpreter.open();
@@ -133,7 +133,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
     iPySparkInterpreter = null;
 
-    if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
+    if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
       // don't print it when it is in testing, just for easy output check in test.
       try {
         InterpreterContext.get().out.write(("IPython is not available, " +
@@ -202,13 +202,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
   }
 
-  private Map setupPySparkEnv() throws IOException{
+  private Map setupPySparkEnv() throws IOException {
     Map env = EnvironmentUtils.getProcEnvironment();
 
-    if (!env.containsKey("PYTHONPATH")) {
-      SparkConf conf = getSparkConf();
-      env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
-              ":../interpreter/lib/python");
+    // only set PYTHONPATH in local or yarn-client mode.
+    // yarn-cluster will setup PYTHONPATH automatically.
+    SparkConf conf = getSparkConf();
+    if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
+      if (!env.containsKey("PYTHONPATH")) {
+        env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
+      } else {
+        env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
+      }
     }
 
     // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
@@ -223,7 +228,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
       }
     }
 
-    LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
+    LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
     return env;
   }
 
@@ -251,6 +256,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     gatewayServer.start();
 
     String pythonExec = getPythonExec(property);
+    LOGGER.info("pythonExec: " + pythonExec);
     CommandLine cmd = CommandLine.parse(pythonExec);
     cmd.addArgument(scriptPath, false);
     cmd.addArgument(Integer.toString(port), false);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
new file mode 100644
index 0000000..8182690
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Util class for PySpark
+ */
+public class PythonUtils {
+
+  /**
+   * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
+   * when it is embedded mode.
+   *
+   * This method will called in zeppelin server process and spark driver process when it is
+   * local or yarn-client mode.
+   */
+  public static String sparkPythonPath() {
+    List<String> pythonPath = new ArrayList<String>();
+    String sparkHome = System.getenv("SPARK_HOME");
+    String zeppelinHome = System.getenv("ZEPPELIN_HOME");
+    if (zeppelinHome == null) {
+      zeppelinHome = new File("..").getAbsolutePath();
+    }
+    if (sparkHome != null) {
+      // non-embedded mode when SPARK_HOME is specified.
+      File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
+      if (!pyspark.exists()) {
+        throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
+      }
+      pythonPath.add(pyspark.getAbsolutePath());
+      File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
+        @Override
+        public boolean accept(File dir, String name) {
+          return name.startsWith("py4j");
+        }
+      });
+      if (py4j.length == 0) {
+        throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
+      } else if (py4j.length > 1) {
+        throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
+      } else {
+        pythonPath.add(py4j[0].getAbsolutePath());
+      }
+    } else {
+      // embedded mode
+      File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
+      if (!pyspark.exists()) {
+        throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
+      }
+      pythonPath.add(pyspark.getAbsolutePath());
+      File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
+          new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+              return name.startsWith("py4j");
+            }
+          });
+      if (py4j.length == 0) {
+        throw new RuntimeException("No py4j files found under " + zeppelinHome +
+            "/interpreter/spark/pyspark");
+      } else if (py4j.length > 1) {
+        throw new RuntimeException("Multiple py4j files found under " + sparkHome +
+            "/interpreter/spark/pyspark");
+      } else {
+        pythonPath.add(py4j[0].getAbsolutePath());
+      }
+    }
+
+    // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
+    pythonPath.add(zeppelinHome + "/interpreter/lib/python");
+    return StringUtils.join(pythonPath, ":");
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index fd12a72..18da034 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -351,7 +351,11 @@ public class SparkInterpreter extends Interpreter {
   }
 
   public boolean isYarnMode() {
-    return getProperty("master").startsWith("yarn");
+    String master = getProperty("master");
+    if (master == null) {
+      master = getProperty().getProperty("spark.master", "local[*]");
+    }
+    return master.startsWith("yarn");
   }
 
   /**
@@ -371,11 +375,6 @@ public class SparkInterpreter extends Interpreter {
       conf.set("spark.executor.uri", execUri);
     }
     conf.set("spark.scheduler.mode", "FAIR");
-    conf.setMaster(getProperty("master"));
-    if (isYarnMode()) {
-      conf.set("master", "yarn");
-      conf.set("spark.submit.deployMode", "client");
-    }
 
     Properties intpProperty = getProperty();
     for (Object k : intpProperty.keySet()) {
@@ -394,8 +393,6 @@ public class SparkInterpreter extends Interpreter {
       }
     }
 
-    setupConfForPySpark(conf);
-    setupConfForSparkR(conf);
     Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
     Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
     Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@@ -529,96 +526,10 @@ public class SparkInterpreter extends Interpreter {
         }
       }
     }
-    setupConfForPySpark(conf);
-    setupConfForSparkR(conf);
     SparkContext sparkContext = new SparkContext(conf);
     return sparkContext;
   }
 
-  private void setupConfForPySpark(SparkConf conf) {
-    Object pysparkBaseProperty =
-        new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
-    String pysparkBasePath = pysparkBaseProperty != null ? pysparkBaseProperty.toString() : null;
-    File pysparkPath;
-    if (null == pysparkBasePath) {
-      pysparkBasePath =
-          new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
-              .getValue().toString();
-      pysparkPath = new File(pysparkBasePath,
-          "interpreter" + File.separator + "spark" + File.separator + "pyspark");
-    } else {
-      pysparkPath = new File(pysparkBasePath,
-          "python" + File.separator + "lib");
-    }
-
-    //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
-    //TODO(zjffdu), this is not maintainable when new version is added.
-    String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip",
-      "py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip", "py4j-0.10.4-src.zip"};
-    ArrayList<String> pythonLibUris = new ArrayList<>();
-    for (String lib : pythonLibs) {
-      File libFile = new File(pysparkPath, lib);
-      if (libFile.exists()) {
-        pythonLibUris.add(libFile.toURI().toString());
-      }
-    }
-    pythonLibUris.trimToSize();
-
-    // Distribute two libraries(pyspark.zip and py4j-*.zip) to workers
-    // when spark version is less than or equal to 1.4.1
-    if (pythonLibUris.size() == 2) {
-      try {
-        String confValue = conf.get("spark.yarn.dist.files");
-        conf.set("spark.yarn.dist.files", confValue + "," + Joiner.on(",").join(pythonLibUris));
-      } catch (NoSuchElementException e) {
-        conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
-      }
-      if (!useSparkSubmit()) {
-        conf.set("spark.files", conf.get("spark.yarn.dist.files"));
-      }
-      conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
-      conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
-    }
-
-    // Distributes needed libraries to workers
-    // when spark version is greater than or equal to 1.5.0
-    if (isYarnMode()) {
-      conf.set("spark.yarn.isPython", "true");
-    }
-  }
-
-  private void setupConfForSparkR(SparkConf conf) {
-    Object sparkRBaseProperty =
-        new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
-    String sparkRBasePath = sparkRBaseProperty != null ? sparkRBaseProperty.toString() : null;
-    File sparkRPath;
-    if (null == sparkRBasePath) {
-      sparkRBasePath =
-          new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
-              .getValue().toString();
-      sparkRPath = new File(sparkRBasePath,
-              "interpreter" + File.separator + "spark" + File.separator + "R");
-    } else {
-      sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
-    }
-
-    sparkRPath = new File(sparkRPath, "sparkr.zip");
-    if (sparkRPath.exists() && sparkRPath.isFile()) {
-      String archives = null;
-      if (conf.contains("spark.yarn.dist.archives")) {
-        archives = conf.get("spark.yarn.dist.archives");
-      }
-      if (archives != null) {
-        archives = archives + "," + sparkRPath + "#sparkr";
-      } else {
-        archives = sparkRPath + "#sparkr";
-      }
-      conf.set("spark.yarn.dist.archives", archives);
-    } else {
-      logger.warn("sparkr.zip is not found, sparkr may not work.");
-    }
-  }
-
   static final String toString(Object o) {
     return (o instanceof String) ? (String) o : "";
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json
index d646805..7c13c49 100644
--- a/spark/src/main/resources/interpreter-setting.json
+++ b/spark/src/main/resources/interpreter-setting.json
@@ -150,9 +150,9 @@
         "description": "Python command to run pyspark with",
         "type": "string"
       },
-      "zeppelin.spark.useIPython": {
+      "zeppelin.pyspark.useIPython": {
         "envName": null,
-        "propertyName": "zeppelin.spark.useIPython",
+        "propertyName": "zeppelin.pyspark.useIPython",
         "defaultValue": true,
         "description": "whether use IPython when it is available",
         "type": "checkbox"

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 5a2e884..3f7cf75 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -59,6 +59,7 @@ public class IPySparkInterpreterTest {
     Properties p = new Properties();
     p.setProperty("spark.master", "local[4]");
     p.setProperty("master", "local[4]");
+    p.setProperty("spark.submit.deployMode", "client");
     p.setProperty("spark.app.name", "Zeppelin Test");
     p.setProperty("zeppelin.spark.useHiveContext", "true");
     p.setProperty("zeppelin.spark.maxResult", "1000");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index c6eb1d4..d695037 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -89,7 +89,7 @@ public class PySparkInterpreterMatplotlibTest {
     p.setProperty("zeppelin.spark.importImplicit", "true");
     p.setProperty("zeppelin.pyspark.python", "python");
     p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
-    p.setProperty("zeppelin.spark.useIPython", "false");
+    p.setProperty("zeppelin.pyspark.useIPython", "false");
     return p;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index ffdb4e8..7a4abd6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -59,7 +59,7 @@ public class PySparkInterpreterTest {
     p.setProperty("zeppelin.spark.importImplicit", "true");
     p.setProperty("zeppelin.pyspark.python", "python");
     p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
-    p.setProperty("zeppelin.spark.useIPython", "false");
+    p.setProperty("zeppelin.pyspark.useIPython", "false");
     return p;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 4288ea3..f5fc70b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,6 +17,8 @@
 
 package org.apache.zeppelin.interpreter;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -34,7 +36,7 @@ import org.apache.zeppelin.resource.ResourcePool;
 public class InterpreterContext {
   private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
 
-  public final InterpreterOutput out;
+  public InterpreterOutput out;
 
   public static InterpreterContext get() {
     return threadIC.get();
@@ -48,21 +50,46 @@ public class InterpreterContext {
     threadIC.remove();
   }
 
-  private final String noteId;
-  private final String replName;
-  private final String paragraphTitle;
-  private final String paragraphId;
-  private final String paragraphText;
+  private String noteId;
+  private String replName;
+  private String paragraphTitle;
+  private String paragraphId;
+  private String paragraphText;
   private AuthenticationInfo authenticationInfo;
-  private final Map<String, Object> config;
-  private GUI gui;
+  private Map<String, Object> config = new HashMap<>();
+  private GUI gui = new GUI();
   private AngularObjectRegistry angularObjectRegistry;
   private ResourcePool resourcePool;
-  private List<InterpreterContextRunner> runners;
+  private List<InterpreterContextRunner> runners = new ArrayList<>();
   private String className;
   private RemoteEventClientWrapper client;
   private RemoteWorksController remoteWorksController;
-  private final Map<String, Integer> progressMap;
+  private Map<String, Integer> progressMap;
+
+  /**
+   * Builder class for InterpreterContext
+   */
+  public static class Builder {
+    private InterpreterContext context = new InterpreterContext();
+
+    public Builder setNoteId(String noteId) {
+      context.noteId = noteId;
+      return this;
+    }
+
+    public Builder setParagraphId(String paragraphId) {
+      context.paragraphId = paragraphId;
+      return this;
+    }
+
+    public InterpreterContext getContext() {
+      return context;
+    }
+  }
+
+  private InterpreterContext() {
+
+  }
 
   // visible for testing
   public InterpreterContext(String noteId,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 24d5ee7..e8db0c5 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -93,6 +93,18 @@
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>jackson-databind</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
index 7debf1b..3d1406a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
@@ -83,7 +83,7 @@ public class AuthenticationIT extends AbstractZeppelinIT {
     }
 
     try {
-      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
       ZeppelinConfiguration conf = ZeppelinConfiguration.create();
       shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
       File file = new File(shiroPath);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
index 7f8765f..999e796 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
@@ -82,7 +82,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
       return;
     }
     try {
-      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
       ZeppelinConfiguration conf = ZeppelinConfiguration.create();
       shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
       interpreterOptionPath = conf.getRelativeDir(String.format("%s/interpreter.json", conf.getConfDir()));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
index b813ea9..dc07435 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
@@ -74,7 +74,7 @@ public class PersonalizeActionsIT extends AbstractZeppelinIT {
       return;
     }
     try {
-      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
       ZeppelinConfiguration conf = ZeppelinConfiguration.create();
       shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
       File file = new File(shiroPath);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index e2f171f..7675cf6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -126,8 +126,8 @@ public abstract class AbstractTestRestApi {
 
   private static void start(boolean withAuth) throws Exception {
     if (!wasRunning) {
-      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
-      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist");
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
+      System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
 
       // some test profile does not build zeppelin-web.
       // to prevent zeppelin starting up fail, create zeppelin-web/dist directory
@@ -211,7 +211,7 @@ public abstract class AbstractTestRestApi {
         // set spark home for pyspark
         sparkProperties.put("spark.home",
             new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkProperties.put("zeppelin.spark.useIPython",  new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
+        sparkProperties.put("zeppelin.pyspark.useIPython",  new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
 
         sparkIntpSetting.setProperties(sparkProperties);
         pySpark = true;
@@ -234,7 +234,7 @@ public abstract class AbstractTestRestApi {
               new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue()));
           sparkProperties.put("zeppelin.spark.useHiveContext",
               new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue()));
-          sparkProperties.put("zeppelin.spark.useIPython",  new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
+          sparkProperties.put("zeppelin.pyspark.useIPython",  new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
 
           pySpark = true;
           sparkR = true;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 314ca18..c67df6b 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -36,7 +36,7 @@
 
   <properties>
     <!--library versions-->
-    <hadoop.version>2.6.0</hadoop.version>
+    <hadoop.version>2.7.3</hadoop.version>
     <commons.lang3.version>3.4</commons.lang3.version>
     <commons.vfs2.version>2.0</commons.vfs2.version>
     <aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
@@ -214,12 +214,6 @@
       </exclusions>
     </dependency>
 
-    <dependency> <!-- because there are two of them above  -->
-      <groupId>xml-apis</groupId>
-      <artifactId>xml-apis</artifactId>
-      <version>${xml.apis.version}</version>
-    </dependency>
-
     <dependency>
       <groupId>org.eclipse.jgit</groupId>
       <artifactId>org.eclipse.jgit</artifactId>
@@ -299,21 +293,91 @@
     </dependency>
 
     <dependency>
-      <groupId>org.mongodb</groupId>
-      <artifactId>mongo-java-driver</artifactId>
-      <version>3.4.1</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <version>1.5</version>
     </dependency>
 
     <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <version>3.4.1</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jackrabbit</groupId>
+          <artifactId>jackrabbit-webdav</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jgit</groupId>
+          <artifactId>org.eclipse.jgit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-compress</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xml-apis</groupId>
+          <artifactId>xml-apis</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
       <exclusions>
         <exclusion>
           <groupId>com.sun.jersey</groupId>
@@ -325,9 +389,90 @@
         </exclusion>
         <exclusion>
           <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
           <artifactId>jersey-server</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jackrabbit</groupId>
+          <artifactId>jackrabbit-webdav</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jgit</groupId>
+          <artifactId>org.eclipse.jgit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-compress</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xml-apis</groupId>
+          <artifactId>xml-apis</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <!--<exclusion>-->
+          <!--<groupId>com.sun.jersey</groupId>-->
+          <!--<artifactId>jersey-core</artifactId>-->
+        <!--</exclusion>-->
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-client</artifactId>
+        </exclusion>
+        <!--<exclusion>-->
+          <!--<groupId>com.sun.jersey</groupId>-->
+          <!--<artifactId>jersey-server</artifactId>-->
+        <!--</exclusion>-->
         <exclusion>
           <groupId>javax.servlet</groupId>
           <artifactId>servlet-api</artifactId>
@@ -349,8 +494,74 @@
           <artifactId>commons-httpclient</artifactId>
         </exclusion>
         <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
+          <groupId>org.eclipse.jgit</groupId>
+          <artifactId>org.eclipse.jgit</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-compress</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xml-apis</groupId>
+          <artifactId>xml-apis</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>xerces</groupId>
+          <artifactId>xercesImpl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <!--<exclusion>-->
+          <!--<groupId>com.sun.jersey</groupId>-->
+          <!--<artifactId>jersey-json</artifactId>-->
+        <!--</exclusion>-->
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>servlet-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.jackrabbit</groupId>
+          <artifactId>jackrabbit-webdav</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-httpclient</groupId>
+          <artifactId>commons-httpclient</artifactId>
         </exclusion>
         <exclusion>
           <groupId>org.eclipse.jgit</groupId>
@@ -373,9 +584,50 @@
           <artifactId>xercesImpl</artifactId>
         </exclusion>
         <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-jaxrs</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-xc</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-spark_2.10</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
           <groupId>com.google.guava</groupId>
           <artifactId>guava</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>com.google.errorprone</groupId>
+          <artifactId>error_prone_annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-context</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
   </dependencies>
@@ -392,6 +644,12 @@
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <forkMode>always</forkMode>
+          <systemProperties>
+            <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+          </systemProperties>
+          <environmentVariables>
+            <!--<ZEPPELIN_HOME>..</ZEPPELIN_HOME>-->
+          </environmentVariables>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index ba90ed8..2dec19c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -493,7 +493,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
   }
 
   public String getConfDir() {
-    return getString(ConfVars.ZEPPELIN_CONF_DIR);
+    return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
   }
 
   public List<String> getAllowedOrigins()

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 79618a3..9a453d8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.interpreter;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.gson.JsonArray;
@@ -26,6 +27,7 @@ import com.google.gson.JsonObject;
 import com.google.gson.annotations.SerializedName;
 import com.google.gson.internal.StringMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.Dependency;
 import org.apache.zeppelin.dep.DependencyResolver;
@@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
@@ -55,6 +58,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -452,7 +456,9 @@ public class InterpreterSetting {
     Properties jProperties = new Properties();
     Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
     for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
-      jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+      if (entry.getValue().getValue() != null) {
+        jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+      }
     }
 
     if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
@@ -707,22 +713,133 @@ public class InterpreterSetting {
           interpreterRunner != null ? interpreterRunner.getPath() :
               conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
           interpreterDir, localRepoPath,
-          getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
+          getEnvFromInterpreterProperty(), connectTimeout,
           remoteInterpreterProcessListener, appEventListener, group);
     }
     return remoteInterpreterProcess;
   }
 
-  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
-    Map<String, String> env = new HashMap<>();
-    for (Object key : property.keySet()) {
-      if (RemoteInterpreterUtils.isEnvString((String) key)) {
-        env.put((String) key, property.getProperty((String) key));
+  private boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+  }
+
+  private Map<String, String> getEnvFromInterpreterProperty() {
+    Map<String, String> env = new HashMap<String, String>();
+    Properties javaProperties = getJavaProperties();
+    Properties sparkProperties = new Properties();
+    String sparkMaster = getSparkMaster();
+    for (String key : javaProperties.stringPropertyNames()) {
+      if (RemoteInterpreterUtils.isEnvString(key)) {
+        env.put(key, javaProperties.getProperty(key));
       }
+      if (isSparkConf(key, javaProperties.getProperty(key))) {
+        sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
+      }
+    }
+
+    setupPropertiesForPySpark(sparkProperties);
+    setupPropertiesForSparkR(sparkProperties, javaProperties.getProperty("SPARK_HOME"));
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      env.put("SPARK_YARN_CLUSTER", "true");
     }
+
+    StringBuilder sparkConfBuilder = new StringBuilder();
+    if (sparkMaster != null) {
+      sparkConfBuilder.append(" --master " + sparkMaster);
+    }
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
+    }
+    for (String name : sparkProperties.stringPropertyNames()) {
+      sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+    }
+
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+    LOGGER.debug("getEnvFromInterpreterProperty: " + env);
     return env;
   }
 
+  private void setupPropertiesForPySpark(Properties sparkProperties) {
+    if (isYarnMode()) {
+      sparkProperties.setProperty("spark.yarn.isPython", "true");
+    }
+  }
+
+  private void mergeSparkProperty(Properties sparkProperties, String propertyName,
+                                  String propertyValue) {
+    if (sparkProperties.containsKey(propertyName)) {
+      String oldPropertyValue = sparkProperties.getProperty(propertyName);
+      sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
+    } else {
+      sparkProperties.setProperty(propertyName, propertyValue);
+    }
+  }
+
+  private void setupPropertiesForSparkR(Properties sparkProperties,
+                                        String sparkHome) {
+    File sparkRBasePath = null;
+    if (sparkHome == null) {
+      if (!getSparkMaster().startsWith("local")) {
+        throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
+      }
+      String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+      sparkRBasePath = new File(zeppelinHome,
+          "interpreter" + File.separator + "spark" + File.separator + "R");
+    } else {
+      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+    }
+
+    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+    if (sparkRPath.exists() && sparkRPath.isFile()) {
+      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+    } else {
+      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+    }
+  }
+
+  private String getSparkMaster() {
+    String master = getJavaProperties().getProperty("master");
+    if (master == null) {
+      master = getJavaProperties().getProperty("spark.master", "local[*]");
+    }
+    return master;
+  }
+
+  private String getDeployMode() {
+    String master = getSparkMaster();
+    if (master.equals("yarn-client")) {
+      return "client";
+    } else if (master.equals("yarn-cluster")) {
+      return "cluster";
+    } else if (master.startsWith("local")) {
+      return "client";
+    } else {
+      String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
+      if (deployMode == null) {
+        throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
+            "is not specified");
+      }
+      if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+        throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
+      }
+      return deployMode;
+    }
+  }
+
+  private boolean isYarnMode() {
+    return getSparkMaster().startsWith("yarn");
+  }
+
+  private String toShellFormat(String value) {
+    if (value.contains("\'") && value.contains("\"")) {
+      throw new RuntimeException("Spark property value could not contain both \" and '");
+    } else if (value.contains("\'")) {
+      return "\"" + value + "\"";
+    } else {
+      return "\'" + value + "\'";
+    }
+  }
+
   private List<Interpreter> getOrCreateSession(String user, String noteId) {
     ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
     Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 585a58a..73babab 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -142,7 +142,7 @@ public class InterpreterSettingManager {
     this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
     LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
     this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
-    LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
+    LOGGER.debug("InterpreterSettingPath: {}", interpreterSettingPath);
     this.dependencyResolver = new DependencyResolver(
         conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
     this.interpreterRepositories = dependencyResolver.getRepos();
@@ -283,7 +283,7 @@ public class InterpreterSettingManager {
   private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
       String interpreterJson) throws IOException {
     URL[] urls = recursiveBuildLibList(new File(interpreterDir));
-    ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
+    ClassLoader tempClassLoader = new URLClassLoader(urls, null);
 
     URL url = tempClassLoader.getResource(interpreterJson);
     if (url == null) {
@@ -392,6 +392,17 @@ public class InterpreterSettingManager {
     return settings;
   }
 
+  public InterpreterSetting getInterpreterSettingByName(String name) {
+    synchronized (interpreterSettings) {
+      for (InterpreterSetting setting : interpreterSettings.values()) {
+        if (setting.getName().equals(name)) {
+          return setting;
+        }
+      }
+    }
+    throw new RuntimeException("No such interpreter setting: " + name);
+  }
+
   public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
     for (InterpreterSetting setting : interpreterSettings.values()) {
       ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index ca23bcf..35b6b6c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -254,7 +254,11 @@ public class RemoteInterpreterEventPoller extends Thread {
     try {
       clearUnreadEvents(interpreterProcess.getClient());
     } catch (Exception e1) {
-      logger.error("Can't get RemoteInterpreterEvent", e1);
+      if (shutdown) {
+        logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
+      } else {
+        logger.error("Can't get RemoteInterpreterEvent", e1);
+      }
     }
     if (appendFuture != null) {
       appendFuture.cancel(true);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 2d64831..d21a962 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -226,6 +226,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
   }
 
   public void stop() {
+    // shutdown EventPoller first.
+    this.remoteInterpreterEventPoller.shutdown();
     if (callbackServer.isServing()) {
       callbackServer.stop();
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index d34c538..e45f15b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -32,7 +32,7 @@ public abstract class RemoteInterpreterProcess {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
 
   private GenericObjectPool<Client> clientPool;
-  private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+  protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
   private int connectTimeout;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
index 21d7526..9ab2137 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
@@ -25,14 +25,10 @@ import static org.mockito.Mockito.mock;
  */
 public abstract class AbstractInterpreterTest {
   protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class);
-  private static final String INTERPRETER_SCRIPT =
-      System.getProperty("os.name").startsWith("Windows") ?
-          "../bin/interpreter.cmd" :
-          "../bin/interpreter.sh";
 
   protected InterpreterSettingManager interpreterSettingManager;
   protected InterpreterFactory interpreterFactory;
-  protected File testRootDir;
+  protected File zeppelinHome;
   protected File interpreterDir;
   protected File confDir;
   protected File notebookDir;
@@ -41,12 +37,11 @@ public abstract class AbstractInterpreterTest {
   @Before
   public void setUp() throws Exception {
     // copy the resources files to a temp folder
-    testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis());
-    testRootDir.mkdirs();
-    LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath());
-    interpreterDir = new File(testRootDir, "interpreter");
-    confDir = new File(testRootDir, "conf");
-    notebookDir = new File(testRootDir, "notebook");
+    zeppelinHome = new File("..");
+    LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
+    interpreterDir = new File(zeppelinHome, "interpreter_" + getClass().getSimpleName());
+    confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
+    notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
 
     interpreterDir.mkdirs();
     confDir.mkdirs();
@@ -55,10 +50,10 @@ public abstract class AbstractInterpreterTest {
     FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir);
     FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir);
 
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
     System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
-    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT);
 
     conf = new ZeppelinConfiguration();
     interpreterSettingManager = new InterpreterSettingManager(conf,
@@ -69,6 +64,8 @@ public abstract class AbstractInterpreterTest {
   @After
   public void tearDown() throws Exception {
     interpreterSettingManager.close();
-    FileUtils.deleteDirectory(testRootDir);
+    FileUtils.deleteDirectory(interpreterDir);
+    FileUtils.deleteDirectory(confDir);
+    FileUtils.deleteDirectory(notebookDir);
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
new file mode 100644
index 0000000..619d01a
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
@@ -0,0 +1,114 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+
+/**
+ *
+ * Util class for creating a Mini Hadoop cluster in local machine to test scenarios that needs
+ * hadoop cluster.
+ */
+public class MiniHadoopCluster {
+
+  private static Logger LOGGER = LoggerFactory.getLogger(MiniHadoopCluster.class);
+
+  private Configuration hadoopConf;
+  private MiniDFSCluster dfsCluster;
+  private MiniYARNCluster yarnCluster;
+  private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath();
+
+  @BeforeClass
+  public void start() throws IOException {
+    LOGGER.info("Starting MiniHadoopCluster ...");
+    this.hadoopConf = new Configuration();
+    new File(configPath).mkdirs();
+    // start MiniDFSCluster
+    this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf)
+        .numDataNodes(2)
+        .format(true)
+        .waitSafeMode(true)
+        .build();
+    this.dfsCluster.waitActive();
+    saveConfig(hadoopConf, configPath + "/core-site.xml");
+
+    // start MiniYarnCluster
+    YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf);
+    this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2,
+        1, 1);
+    yarnCluster.init(baseConfig);
+
+    // Install a shutdown hook for stop the service and kill all running applications.
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        yarnCluster.stop();
+      }
+    });
+
+    yarnCluster.start();
+
+    // Workaround for YARN-2642.
+    Configuration yarnConfig = yarnCluster.getConfig();
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < 30 * 1000) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+      if (!yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
+        break;
+      }
+    }
+    if (yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
+      throw new IOException("RM not up yes");
+    }
+
+    LOGGER.info("RM address in configuration is " + yarnConfig.get(YarnConfiguration.RM_ADDRESS));
+    saveConfig(yarnConfig,configPath + "/yarn-site.xml");
+  }
+
+  protected void saveConfig(Configuration conf, String dest) throws IOException {
+    Configuration redacted = new Configuration(conf);
+    // This setting references a test class that is not available when using a real Spark
+    // installation, so remove it from client configs.
+    redacted.unset("net.topology.node.switch.mapping.impl");
+
+    FileOutputStream out = new FileOutputStream(dest);
+    try {
+      redacted.writeXml(out);
+    } finally {
+      out.close();
+    }
+    LOGGER.info("Save configuration to " + dest);
+  }
+
+  @AfterClass
+  public void stop() {
+    if (this.yarnCluster != null) {
+      this.yarnCluster.stop();
+    }
+    if (this.dfsCluster != null) {
+      this.dfsCluster.shutdown();
+    }
+  }
+
+  public String getConfigPath() {
+    return configPath;
+  }
+
+  public MiniYARNCluster getYarnCluster() {
+    return yarnCluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
new file mode 100644
index 0000000..923ae5a
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
@@ -0,0 +1,68 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+
+public class MiniZeppelin {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(MiniZeppelin.class);
+
+  protected InterpreterSettingManager interpreterSettingManager;
+  protected InterpreterFactory interpreterFactory;
+  protected File zeppelinHome;
+  private File confDir;
+  private File notebookDir;
+  protected ZeppelinConfiguration conf;
+
+  public void start() throws IOException {
+    zeppelinHome = new File("..");
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
+        zeppelinHome.getAbsolutePath());
+    confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
+    notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
+    confDir.mkdirs();
+    notebookDir.mkdirs();
+    LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
+    FileUtils.copyFile(new File(zeppelinHome, "conf/log4j.properties"), new File(confDir, "log4j.properties"));
+    FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
+    System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+    conf = new ZeppelinConfiguration();
+    interpreterSettingManager = new InterpreterSettingManager(conf,
+        mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+    interpreterFactory = new InterpreterFactory(interpreterSettingManager);
+  }
+
+  public void stop() throws IOException {
+    interpreterSettingManager.close();
+    FileUtils.deleteDirectory(confDir);
+    FileUtils.deleteDirectory(notebookDir);
+  }
+
+  public File getZeppelinHome() {
+    return zeppelinHome;
+  }
+
+  public File getZeppelinConfDir() {
+    return confDir;
+  }
+
+  public InterpreterFactory getInterpreterFactory() {
+    return interpreterFactory;
+  }
+
+  public InterpreterSettingManager getInterpreterSettingManager() {
+    return interpreterSettingManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
new file mode 100644
index 0000000..24a9aee
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
@@ -0,0 +1,147 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkInterpreterModeTest {
+
+  private static MiniHadoopCluster hadoopCluster;
+  private static MiniZeppelin zeppelin;
+  private static InterpreterFactory interpreterFactory;
+  private static InterpreterSettingManager interpreterSettingManager;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    hadoopCluster = new MiniHadoopCluster();
+    hadoopCluster.start();
+
+    zeppelin = new MiniZeppelin();
+    zeppelin.start();
+    interpreterFactory = zeppelin.getInterpreterFactory();
+    interpreterSettingManager = zeppelin.getInterpreterSettingManager();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (zeppelin != null) {
+      zeppelin.stop();
+    }
+    if (hadoopCluster != null) {
+      hadoopCluster.stop();
+    }
+  }
+
+  private void testInterpreterBasics() throws IOException {
+    // test SparkInterpreter
+    interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+    Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
+
+    InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
+    InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
+
+    // test PySparkInterpreter
+    Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
+    interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+    // test IPySparkInterpreter
+    Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
+    interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+    // test SparkSQLInterpreter
+    Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql");
+    interpreterResult = sqlInterpreter.interpret("select count(1) from test", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+    assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+    assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
+  }
+
+  @Test
+  public void testLocalMode() throws IOException, YarnException {
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "local[*]");
+    sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+
+    testInterpreterBasics();
+
+    // no yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(0, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  @Test
+  public void testYarnClientMode() throws IOException, YarnException, InterruptedException {
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "yarn-client");
+    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+    sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+    sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
+    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  @Test
+  public void testYarnClusterMode() throws IOException, YarnException, InterruptedException {
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("master", "yarn-cluster");
+    sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+    sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+    sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+    sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+    sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+    sparkInterpreterSetting.setProperty("spark.pyspark.python", getPythonExec());
+    sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
+
+  private String getPythonExec() throws IOException, InterruptedException {
+    Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
+    if (process.waitFor() != 0) {
+      throw new RuntimeException("Fail to run command: which python.");
+    }
+    return IOUtils.toString(process.getInputStream()).trim();
+  }
+}