You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/10/14 05:46:16 UTC

[2/4] zeppelin git commit: ZEPPELIN-2685. Improvement on Interpreter class

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index c1dba5c..e2a10e6 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -123,7 +123,7 @@ public class InterpreterRestApi {
               request.getOption(), request.getProperties());
       logger.info("new setting created with {}", interpreterSetting.getId());
       return new JsonResponse<>(Status.OK, "", interpreterSetting).build();
-    } catch (InterpreterException | IOException e) {
+    } catch (IOException e) {
       logger.error("Exception in InterpreterRestApi while creating ", e);
       return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
           .build();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 3e46449..2fa584b 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.Note;
@@ -171,7 +172,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     }
 
     @Test
-    public void sparkRTest() throws IOException {
+    public void sparkRTest() throws IOException, InterpreterException {
       // create new note
       Note note = ZeppelinServer.notebook.createNote(anonymous);
       int sparkVersion = getSparkVersionNumber(note);
@@ -426,7 +427,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     }
 
     @Test
-    public void pySparkDepLoaderTest() throws IOException {
+    public void pySparkDepLoaderTest() throws IOException, InterpreterException {
         // create new note
         Note note = ZeppelinServer.notebook.createNote(anonymous);
         int sparkVersionNumber = getSparkVersionNumber(note);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index c67df6b..d1a2270 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -71,11 +71,6 @@
     </dependency>
 
     <dependency>
-      <groupId>commons-configuration</groupId>
-      <artifactId>commons-configuration</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
deleted file mode 100644
index 3a82bc5..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ /dev/null
@@ -1,847 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.conf;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.configuration.tree.ConfigurationNode;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Zeppelin configuration.
- *
- */
-public class ZeppelinConfiguration extends XMLConfiguration {
-  private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
-  private static final long serialVersionUID = 4749305895693848035L;
-  private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
-
-  private static final String HELIUM_PACKAGE_DEFAULT_URL =
-      "https://s3.amazonaws.com/helium-package/helium.json";
-  private static ZeppelinConfiguration conf;
-
-  public ZeppelinConfiguration(URL url) throws ConfigurationException {
-    setDelimiterParsingDisabled(true);
-    load(url);
-  }
-
-  public ZeppelinConfiguration() {
-    ConfVars[] vars = ConfVars.values();
-    for (ConfVars v : vars) {
-      if (v.getType() == ConfVars.VarType.BOOLEAN) {
-        this.setProperty(v.getVarName(), v.getBooleanValue());
-      } else if (v.getType() == ConfVars.VarType.LONG) {
-        this.setProperty(v.getVarName(), v.getLongValue());
-      } else if (v.getType() == ConfVars.VarType.INT) {
-        this.setProperty(v.getVarName(), v.getIntValue());
-      } else if (v.getType() == ConfVars.VarType.FLOAT) {
-        this.setProperty(v.getVarName(), v.getFloatValue());
-      } else if (v.getType() == ConfVars.VarType.STRING) {
-        this.setProperty(v.getVarName(), v.getStringValue());
-      } else {
-        throw new RuntimeException("Unsupported VarType");
-      }
-    }
-
-  }
-
-
-  /**
-   * Load from resource.
-   *url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
-   * @throws ConfigurationException
-   */
-  public static synchronized ZeppelinConfiguration create() {
-    if (conf != null) {
-      return conf;
-    }
-
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    URL url;
-
-    url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
-    if (url == null) {
-      ClassLoader cl = ZeppelinConfiguration.class.getClassLoader();
-      if (cl != null) {
-        url = cl.getResource(ZEPPELIN_SITE_XML);
-      }
-    }
-    if (url == null) {
-      url = classLoader.getResource(ZEPPELIN_SITE_XML);
-    }
-
-    if (url == null) {
-      LOG.warn("Failed to load configuration, proceeding with a default");
-      conf = new ZeppelinConfiguration();
-    } else {
-      try {
-        LOG.info("Load configuration from " + url);
-        conf = new ZeppelinConfiguration(url);
-      } catch (ConfigurationException e) {
-        LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e);
-        conf = new ZeppelinConfiguration();
-      }
-    }
-
-    LOG.info("Server Host: " + conf.getServerAddress());
-    if (conf.useSsl() == false) {
-      LOG.info("Server Port: " + conf.getServerPort());
-    } else {
-      LOG.info("Server SSL Port: " + conf.getServerSslPort());
-    }
-    LOG.info("Context Path: " + conf.getServerContextPath());
-    LOG.info("Zeppelin Version: " + Util.getVersion());
-
-    return conf;
-  }
-
-
-  private String getStringValue(String name, String d) {
-    List<ConfigurationNode> properties = getRootNode().getChildren();
-    if (properties == null || properties.isEmpty()) {
-      return d;
-    }
-    for (ConfigurationNode p : properties) {
-      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
-          && name.equals(p.getChildren("name").get(0).getValue())) {
-        return (String) p.getChildren("value").get(0).getValue();
-      }
-    }
-    return d;
-  }
-
-  private int getIntValue(String name, int d) {
-    List<ConfigurationNode> properties = getRootNode().getChildren();
-    if (properties == null || properties.isEmpty()) {
-      return d;
-    }
-    for (ConfigurationNode p : properties) {
-      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
-          && name.equals(p.getChildren("name").get(0).getValue())) {
-        return Integer.parseInt((String) p.getChildren("value").get(0).getValue());
-      }
-    }
-    return d;
-  }
-
-  private long getLongValue(String name, long d) {
-    List<ConfigurationNode> properties = getRootNode().getChildren();
-    if (properties == null || properties.isEmpty()) {
-      return d;
-    }
-    for (ConfigurationNode p : properties) {
-      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
-          && name.equals(p.getChildren("name").get(0).getValue())) {
-        return Long.parseLong((String) p.getChildren("value").get(0).getValue());
-      }
-    }
-    return d;
-  }
-
-  private float getFloatValue(String name, float d) {
-    List<ConfigurationNode> properties = getRootNode().getChildren();
-    if (properties == null || properties.isEmpty()) {
-      return d;
-    }
-    for (ConfigurationNode p : properties) {
-      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
-          && name.equals(p.getChildren("name").get(0).getValue())) {
-        return Float.parseFloat((String) p.getChildren("value").get(0).getValue());
-      }
-    }
-    return d;
-  }
-
-  private boolean getBooleanValue(String name, boolean d) {
-    List<ConfigurationNode> properties = getRootNode().getChildren();
-    if (properties == null || properties.isEmpty()) {
-      return d;
-    }
-    for (ConfigurationNode p : properties) {
-      if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
-          && name.equals(p.getChildren("name").get(0).getValue())) {
-        return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue());
-      }
-    }
-    return d;
-  }
-
-  public String getString(ConfVars c) {
-    return getString(c.name(), c.getVarName(), c.getStringValue());
-  }
-
-  public String getString(String envName, String propertyName, String defaultValue) {
-    if (System.getenv(envName) != null) {
-      return System.getenv(envName);
-    }
-    if (System.getProperty(propertyName) != null) {
-      return System.getProperty(propertyName);
-    }
-
-    return getStringValue(propertyName, defaultValue);
-  }
-
-  public int getInt(ConfVars c) {
-    return getInt(c.name(), c.getVarName(), c.getIntValue());
-  }
-
-  public int getInt(String envName, String propertyName, int defaultValue) {
-    if (System.getenv(envName) != null) {
-      return Integer.parseInt(System.getenv(envName));
-    }
-
-    if (System.getProperty(propertyName) != null) {
-      return Integer.parseInt(System.getProperty(propertyName));
-    }
-    return getIntValue(propertyName, defaultValue);
-  }
-
-  public long getLong(ConfVars c) {
-    return getLong(c.name(), c.getVarName(), c.getLongValue());
-  }
-
-  public long getLong(String envName, String propertyName, long defaultValue) {
-    if (System.getenv(envName) != null) {
-      return Long.parseLong(System.getenv(envName));
-    }
-
-    if (System.getProperty(propertyName) != null) {
-      return Long.parseLong(System.getProperty(propertyName));
-    }
-    return getLongValue(propertyName, defaultValue);
-  }
-
-  public float getFloat(ConfVars c) {
-    return getFloat(c.name(), c.getVarName(), c.getFloatValue());
-  }
-
-  public float getFloat(String envName, String propertyName, float defaultValue) {
-    if (System.getenv(envName) != null) {
-      return Float.parseFloat(System.getenv(envName));
-    }
-    if (System.getProperty(propertyName) != null) {
-      return Float.parseFloat(System.getProperty(propertyName));
-    }
-    return getFloatValue(propertyName, defaultValue);
-  }
-
-  public boolean getBoolean(ConfVars c) {
-    return getBoolean(c.name(), c.getVarName(), c.getBooleanValue());
-  }
-
-  public boolean getBoolean(String envName, String propertyName, boolean defaultValue) {
-    if (System.getenv(envName) != null) {
-      return Boolean.parseBoolean(System.getenv(envName));
-    }
-
-    if (System.getProperty(propertyName) != null) {
-      return Boolean.parseBoolean(System.getProperty(propertyName));
-    }
-    return getBooleanValue(propertyName, defaultValue);
-  }
-
-  public boolean useSsl() {
-    return getBoolean(ConfVars.ZEPPELIN_SSL);
-  }
-
-  public int getServerSslPort() {
-    return getInt(ConfVars.ZEPPELIN_SSL_PORT);
-  }
-
-  public boolean useClientAuth() {
-    return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH);
-  }
-
-  public String getServerAddress() {
-    return getString(ConfVars.ZEPPELIN_ADDR);
-  }
-
-  public int getServerPort() {
-    return getInt(ConfVars.ZEPPELIN_PORT);
-  }
-
-  public String getServerContextPath() {
-    return getString(ConfVars.ZEPPELIN_SERVER_CONTEXT_PATH);
-  }
-
-  public String getKeyStorePath() {
-    String path = getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH);
-    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
-      return path;
-    } else {
-      return getRelativeDir(
-          String.format("%s/%s",
-              getConfDir(),
-              path));
-    }
-  }
-
-  public String getKeyStoreType() {
-    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE);
-  }
-
-  public String getKeyStorePassword() {
-    return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD);
-  }
-
-  public String getKeyManagerPassword() {
-    String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD);
-    if (password == null) {
-      return getKeyStorePassword();
-    } else {
-      return password;
-    }
-  }
-
-  public String getTrustStorePath() {
-    String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH);
-    if (path == null) {
-      path = getKeyStorePath();
-    }
-    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
-      return path;
-    } else {
-      return getRelativeDir(
-          String.format("%s/%s",
-              getConfDir(),
-              path));
-    }
-  }
-
-  public String getTrustStoreType() {
-    String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE);
-    if (type == null) {
-      return getKeyStoreType();
-    } else {
-      return type;
-    }
-  }
-
-  public String getTrustStorePassword() {
-    String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD);
-    if (password == null) {
-      return getKeyStorePassword();
-    } else {
-      return password;
-    }
-  }
-
-  public String getNotebookDir() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
-  }
-
-  public String getUser() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
-  }
-
-  public String getBucketName() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET);
-  }
-
-  public String getEndpoint() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_ENDPOINT);
-  }
-
-  public String getS3KMSKeyID() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID);
-  }
-
-  public String getS3KMSKeyRegion() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION);
-  }
-
-  public String getS3EncryptionMaterialsProviderClass() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_EMP);
-  }
-
-  public boolean isS3ServerSideEncryption() {
-    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE);
-  }
-
-  public String getMongoUri() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
-  }
-
-  public String getMongoDatabase() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE);
-  }
-
-  public String getMongoCollection() {
-    return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION);
-  }
-
-  public boolean getMongoAutoimport() {
-    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT);
-  }
-
-  public String getInterpreterListPath() {
-    return getRelativeDir(String.format("%s/interpreter-list", getConfDir()));
-  }
-
-  public String getInterpreterDir() {
-    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
-  }
-
-  public String getInterpreterJson() {
-    return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON);
-  }
-
-  public String getInterpreterSettingPath() {
-    return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
-  }
-
-  public String getHeliumConfPath() {
-    return getRelativeDir(String.format("%s/helium.json", getConfDir()));
-  }
-
-  public String getHeliumRegistry() {
-    return getRelativeDir(ConfVars.ZEPPELIN_HELIUM_REGISTRY);
-  }
-
-  public String getHeliumNodeInstallerUrl() {
-    return getString(ConfVars.ZEPPELIN_HELIUM_NODE_INSTALLER_URL);
-  }
-
-  public String getHeliumNpmInstallerUrl() {
-    return getString(ConfVars.ZEPPELIN_HELIUM_NPM_INSTALLER_URL);
-  }
-
-  public String getHeliumYarnInstallerUrl() {
-    return getString(ConfVars.ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL);
-  }
-
-  public String getNotebookAuthorizationPath() {
-    return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir()));
-  }
-
-  public Boolean credentialsPersist() {
-    return getBoolean(ConfVars.ZEPPELIN_CREDENTIALS_PERSIST);
-  }
-
-  public String getCredentialsEncryptKey() {
-    return getString(ConfVars.ZEPPELIN_CREDENTIALS_ENCRYPT_KEY);
-  }
-
-  public String getCredentialsPath() {
-    return getRelativeDir(String.format("%s/credentials.json", getConfDir()));
-  }
-
-  public String getShiroPath() {
-    String shiroPath = getRelativeDir(String.format("%s/shiro.ini", getConfDir()));
-    return new File(shiroPath).exists() ? shiroPath : StringUtils.EMPTY;
-  }
-
-  public String getInterpreterRemoteRunnerPath() {
-    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
-  }
-
-  public String getInterpreterLocalRepoPath() {
-    return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO);
-  }
-
-  public String getInterpreterMvnRepoPath() {
-    return getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO);
-  }
-
-  public String getRelativeDir(ConfVars c) {
-    return getRelativeDir(getString(c));
-  }
-
-  public String getRelativeDir(String path) {
-    if (path != null && path.startsWith("/") || isWindowsPath(path)) {
-      return path;
-    } else {
-      return getString(ConfVars.ZEPPELIN_HOME) + "/" + path;
-    }
-  }
-
-  public String getCallbackPortRange() {
-    return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
-  }
-
-  public boolean isWindowsPath(String path){
-    return path.matches("^[A-Za-z]:\\\\.*");
-  }
-
-  public boolean isAnonymousAllowed() {
-    return getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED);
-  }
-
-  public boolean isNotebokPublic() {
-    return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC);
-  }
-
-  public String getConfDir() {
-    return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
-  }
-
-  public List<String> getAllowedOrigins()
-  {
-    if (getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).isEmpty()) {
-      return Arrays.asList(new String[0]);
-    }
-
-    return Arrays.asList(getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).toLowerCase().split(","));
-  }
-
-  public String getWebsocketMaxTextMessageSize() {
-    return getString(ConfVars.ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
-  }
-
-  public String getJettyName() {
-    return getString(ConfVars.ZEPPELIN_SERVER_JETTY_NAME);
-  }
-
-
-  public String getXFrameOptions() {
-    return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS);
-  }
-
-  public String getXxssProtection() {
-    return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION);
-  }
-
-  public String getStrictTransport() {
-    return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
-  }
-
-
-  public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
-                                                ConfigurationKeyPredicate predicate) {
-    Map<String, String> configurations = new HashMap<>();
-
-    for (ConfVars v : ConfVars.values()) {
-      String key = v.getVarName();
-
-      if (!predicate.apply(key)) {
-        continue;
-      }
-
-      ConfVars.VarType type = v.getType();
-      Object value = null;
-      if (type == ConfVars.VarType.BOOLEAN) {
-        value = conf.getBoolean(v);
-      } else if (type == ConfVars.VarType.LONG) {
-        value = conf.getLong(v);
-      } else if (type == ConfVars.VarType.INT) {
-        value = conf.getInt(v);
-      } else if (type == ConfVars.VarType.FLOAT) {
-        value = conf.getFloat(v);
-      } else if (type == ConfVars.VarType.STRING) {
-        value = conf.getString(v);
-      }
-
-      if (value != null) {
-        configurations.put(key, value.toString());
-      }
-    }
-    return configurations;
-  }
-
-  /**
-   * Predication whether key/value pair should be included or not
-   */
-  public interface ConfigurationKeyPredicate {
-    boolean apply(String key);
-  }
-
-  /**
-   * Wrapper class.
-   */
-  public static enum ConfVars {
-    ZEPPELIN_HOME("zeppelin.home", "./"),
-    ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"),
-    ZEPPELIN_PORT("zeppelin.server.port", 8080),
-    ZEPPELIN_SERVER_CONTEXT_PATH("zeppelin.server.context.path", "/"),
-    ZEPPELIN_SSL("zeppelin.ssl", false),
-    ZEPPELIN_SSL_PORT("zeppelin.server.ssl.port", 8443),
-    ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
-    ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),
-    ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"),
-    ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""),
-    ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null),
-    ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null),
-    ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null),
-    ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null),
-    ZEPPELIN_WAR("zeppelin.war", "zeppelin-web/dist"),
-    ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"),
-    ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter,"
-        + "org.apache.zeppelin.spark.PySparkInterpreter,"
-        + "org.apache.zeppelin.rinterpreter.RRepl,"
-        + "org.apache.zeppelin.rinterpreter.KnitR,"
-        + "org.apache.zeppelin.spark.SparkRInterpreter,"
-        + "org.apache.zeppelin.spark.SparkSqlInterpreter,"
-        + "org.apache.zeppelin.spark.DepInterpreter,"
-        + "org.apache.zeppelin.markdown.Markdown,"
-        + "org.apache.zeppelin.angular.AngularInterpreter,"
-        + "org.apache.zeppelin.shell.ShellInterpreter,"
-        + "org.apache.zeppelin.livy.LivySparkInterpreter,"
-        + "org.apache.zeppelin.livy.LivySparkSQLInterpreter,"
-        + "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
-        + "org.apache.zeppelin.livy.LivyPySpark3Interpreter,"
-        + "org.apache.zeppelin.livy.LivySparkRInterpreter,"
-        + "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
-        + "org.apache.zeppelin.file.HDFSFileInterpreter,"
-        + "org.apache.zeppelin.pig.PigInterpreter,"
-        + "org.apache.zeppelin.pig.PigQueryInterpreter,"
-        + "org.apache.zeppelin.flink.FlinkInterpreter,"
-        + "org.apache.zeppelin.python.PythonInterpreter,"
-        + "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
-        + "org.apache.zeppelin.python.PythonCondaInterpreter,"
-        + "org.apache.zeppelin.python.PythonDockerInterpreter,"
-        + "org.apache.zeppelin.ignite.IgniteInterpreter,"
-        + "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
-        + "org.apache.zeppelin.lens.LensInterpreter,"
-        + "org.apache.zeppelin.cassandra.CassandraInterpreter,"
-        + "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
-        + "org.apache.zeppelin.kylin.KylinInterpreter,"
-        + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
-        + "org.apache.zeppelin.scalding.ScaldingInterpreter,"
-        + "org.apache.zeppelin.jdbc.JDBCInterpreter,"
-        + "org.apache.zeppelin.hbase.HbaseInterpreter,"
-        + "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
-        + "org.apache.zeppelin.beam.BeamInterpreter,"
-        + "org.apache.zeppelin.scio.ScioInterpreter,"
-        + "org.apache.zeppelin.groovy.GroovyInterpreter,"
-        + "org.apache.zeppelin.neo4j.Neo4jCypherInterpreter"
-        ),
-    ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
-    ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
-    ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
-    ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo",
-        "http://repo1.maven.org/maven2/"),
-    ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
-    ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
-    ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
-        + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
-        + "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy,neo4j"),
-    ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
-    ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
-    ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
-    // use specified notebook (id) as homescreen
-    ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
-    // whether homescreen notebook will be hidden from notebook list or not
-    ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false),
-    ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
-    ZEPPELIN_NOTEBOOK_S3_ENDPOINT("zeppelin.notebook.s3.endpoint", "s3.amazonaws.com"),
-    ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
-    ZEPPELIN_NOTEBOOK_S3_EMP("zeppelin.notebook.s3.encryptionMaterialsProvider", null),
-    ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID("zeppelin.notebook.s3.kmsKeyID", null),
-    ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION("zeppelin.notebook.s3.kmsKeyRegion", null),
-    ZEPPELIN_NOTEBOOK_S3_SSE("zeppelin.notebook.s3.sse", false),
-    ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
-    ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
-    ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
-    ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"),
-    ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"),
-    ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"),
-    ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false),
-    ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage",
-        "org.apache.zeppelin.notebook.repo.GitNotebookRepo"),
-    ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
-    // whether by default note is public or private
-    ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),
-    ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
-        System.getProperty("os.name")
-                .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),
-    // Decide when new note is created, interpreter settings will be binded automatically or not.
-    ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),
-    ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"),
-    ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"),
-    ZEPPELIN_HELIUM_REGISTRY("zeppelin.helium.registry", "helium," + HELIUM_PACKAGE_DEFAULT_URL),
-    ZEPPELIN_HELIUM_NODE_INSTALLER_URL("zeppelin.helium.node.installer.url",
-            "https://nodejs.org/dist/"),
-    ZEPPELIN_HELIUM_NPM_INSTALLER_URL("zeppelin.helium.npm.installer.url",
-            "http://registry.npmjs.org/"),
-    ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL("zeppelin.helium.yarnpkg.installer.url",
-            "https://github.com/yarnpkg/yarn/releases/download/"),
-    // Allows a way to specify a ',' separated list of allowed origins for rest and websockets
-    // i.e. http://localhost:8080
-    ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),
-    ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true),
-    ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true),
-    ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null),
-    ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"),
-    ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false),
-    ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
-    ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
-    ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"),
-    ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
-
-    ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
-    ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
-
-    ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
-
-    private String varName;
-    @SuppressWarnings("rawtypes")
-    private Class varClass;
-    private String stringValue;
-    private VarType type;
-    private int intValue;
-    private float floatValue;
-    private boolean booleanValue;
-    private long longValue;
-
-
-    ConfVars(String varName, String varValue) {
-      this.varName = varName;
-      this.varClass = String.class;
-      this.stringValue = varValue;
-      this.intValue = -1;
-      this.floatValue = -1;
-      this.longValue = -1;
-      this.booleanValue = false;
-      this.type = VarType.STRING;
-    }
-
-    ConfVars(String varName, int intValue) {
-      this.varName = varName;
-      this.varClass = Integer.class;
-      this.stringValue = null;
-      this.intValue = intValue;
-      this.floatValue = -1;
-      this.longValue = -1;
-      this.booleanValue = false;
-      this.type = VarType.INT;
-    }
-
-    ConfVars(String varName, long longValue) {
-      this.varName = varName;
-      this.varClass = Integer.class;
-      this.stringValue = null;
-      this.intValue = -1;
-      this.floatValue = -1;
-      this.longValue = longValue;
-      this.booleanValue = false;
-      this.type = VarType.LONG;
-    }
-
-    ConfVars(String varName, float floatValue) {
-      this.varName = varName;
-      this.varClass = Float.class;
-      this.stringValue = null;
-      this.intValue = -1;
-      this.longValue = -1;
-      this.floatValue = floatValue;
-      this.booleanValue = false;
-      this.type = VarType.FLOAT;
-    }
-
-    ConfVars(String varName, boolean booleanValue) {
-      this.varName = varName;
-      this.varClass = Boolean.class;
-      this.stringValue = null;
-      this.intValue = -1;
-      this.longValue = -1;
-      this.floatValue = -1;
-      this.booleanValue = booleanValue;
-      this.type = VarType.BOOLEAN;
-    }
-
-    public String getVarName() {
-      return varName;
-    }
-
-    @SuppressWarnings("rawtypes")
-    public Class getVarClass() {
-      return varClass;
-    }
-
-    public int getIntValue() {
-      return intValue;
-    }
-
-    public long getLongValue() {
-      return longValue;
-    }
-
-    public float getFloatValue() {
-      return floatValue;
-    }
-
-    public String getStringValue() {
-      return stringValue;
-    }
-
-    public boolean getBooleanValue() {
-      return booleanValue;
-    }
-
-    public VarType getType() {
-      return type;
-    }
-
-    enum VarType {
-      STRING {
-        @Override
-        void checkType(String value) throws Exception {}
-      },
-      INT {
-        @Override
-        void checkType(String value) throws Exception {
-          Integer.valueOf(value);
-        }
-      },
-      LONG {
-        @Override
-        void checkType(String value) throws Exception {
-          Long.valueOf(value);
-        }
-      },
-      FLOAT {
-        @Override
-        void checkType(String value) throws Exception {
-          Float.valueOf(value);
-        }
-      },
-      BOOLEAN {
-        @Override
-        void checkType(String value) throws Exception {
-          Boolean.valueOf(value);
-        }
-      };
-
-      boolean isType(String value) {
-        try {
-          checkType(value);
-        } catch (Exception e) {
-          LOG.error("Exception in ZeppelinConfiguration while isType", e);
-          return false;
-        }
-        return true;
-      }
-
-      String typeString() {
-        return name().toUpperCase();
-      }
-
-      abstract void checkType(String value) throws Exception;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index f020919..7233239 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -77,7 +77,7 @@ public class InterpreterFactory {
           return interpreter;
         }
       }
-      throw new InterpreterException(replName + " interpreter not found");
+      return null;
 
     } else {
       // first assume replName is 'name' of interpreter. ('groupName' is ommitted)

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 5af01dc..a82d5bf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -18,7 +18,6 @@
 package org.apache.zeppelin.interpreter;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.gson.JsonArray;
@@ -34,19 +33,22 @@ import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
 import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
+import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
+import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URL;
@@ -58,7 +60,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -132,6 +133,10 @@ public class InterpreterSetting {
 
   private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
 
+  // TODO(zjffdu) ShellScriptLauncher is the only launcher implemention for now. It could be other
+  // launcher in future when we have other launcher implementation. e.g. third party launcher
+  // service like livy
+  private transient InterpreterLauncher launcher;
   ///////////////////////////////////////////////////////////////////////////////////////////
 
 
@@ -243,6 +248,7 @@ public class InterpreterSetting {
   }
 
   void postProcessing() {
+//    createLauncher();
     this.status = Status.READY;
   }
 
@@ -266,6 +272,14 @@ public class InterpreterSetting {
     this.conf = o.getConf();
   }
 
+  private void createLauncher() {
+    if (group.equals("spark")) {
+      this.launcher = new SparkInterpreterLauncher(this.conf);
+    } else {
+      this.launcher = new ShellScriptLauncher(this.conf);
+    }
+  }
+
   public AngularObjectRegistryListener getAngularObjectRegistryListener() {
     return angularObjectRegistryListener;
   }
@@ -626,152 +640,17 @@ public class InterpreterSetting {
     }
     return interpreters;
   }
-  
-  RemoteInterpreterProcess createInterpreterProcess() {
-    RemoteInterpreterProcess remoteInterpreterProcess = null;
-    int connectTimeout =
-        conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
-    String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
-    if (option.isExistingProcess()) {
-      // TODO(zjffdu) remove the existing process approach seems no one is using this.
-      // use the existing process
-      remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
-          connectTimeout,
-          remoteInterpreterProcessListener,
-          appEventListener,
-          option.getHost(),
-          option.getPort());
-    } else {
-      // create new remote process
-      remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
-          interpreterRunner != null ? interpreterRunner.getPath() :
-              conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
-          interpreterDir, localRepoPath,
-          getEnvFromInterpreterProperty(), connectTimeout,
-          remoteInterpreterProcessListener, appEventListener, group);
-    }
-    return remoteInterpreterProcess;
-  }
-
-  private boolean isSparkConf(String key, String value) {
-    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
-  }
-
-  private Map<String, String> getEnvFromInterpreterProperty() {
-    Map<String, String> env = new HashMap<String, String>();
-    Properties javaProperties = getJavaProperties();
-    Properties sparkProperties = new Properties();
-    String sparkMaster = getSparkMaster();
-    for (String key : javaProperties.stringPropertyNames()) {
-      if (RemoteInterpreterUtils.isEnvString(key)) {
-        env.put(key, javaProperties.getProperty(key));
-      }
-      if (isSparkConf(key, javaProperties.getProperty(key))) {
-        sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
-      }
-    }
-
-    setupPropertiesForPySpark(sparkProperties);
-    setupPropertiesForSparkR(sparkProperties, System.getenv("SPARK_HOME"));
-    if (isYarnMode() && getDeployMode().equals("cluster")) {
-      env.put("SPARK_YARN_CLUSTER", "true");
-    }
-
-    StringBuilder sparkConfBuilder = new StringBuilder();
-    if (sparkMaster != null) {
-      sparkConfBuilder.append(" --master " + sparkMaster);
-    }
-    if (isYarnMode() && getDeployMode().equals("cluster")) {
-      sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
-    }
-    for (String name : sparkProperties.stringPropertyNames()) {
-      sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
-    }
-
-    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
-    LOGGER.debug("getEnvFromInterpreterProperty: " + env);
-    return env;
-  }
-
-  private void setupPropertiesForPySpark(Properties sparkProperties) {
-    if (isYarnMode()) {
-      sparkProperties.setProperty("spark.yarn.isPython", "true");
-    }
-  }
 
-  private void mergeSparkProperty(Properties sparkProperties, String propertyName,
-                                  String propertyValue) {
-    if (sparkProperties.containsKey(propertyName)) {
-      String oldPropertyValue = sparkProperties.getProperty(propertyName);
-      sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
-    } else {
-      sparkProperties.setProperty(propertyName, propertyValue);
-    }
-  }
-
-  private void setupPropertiesForSparkR(Properties sparkProperties,
-                                        String sparkHome) {
-    File sparkRBasePath = null;
-    if (sparkHome == null) {
-      if (!getSparkMaster().startsWith("local")) {
-        throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
-      }
-      String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
-      sparkRBasePath = new File(zeppelinHome,
-          "interpreter" + File.separator + "spark" + File.separator + "R");
-    } else {
-      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
-    }
-
-    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
-    if (sparkRPath.exists() && sparkRPath.isFile()) {
-      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
-    } else {
-      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
-    }
-  }
-
-  private String getSparkMaster() {
-    String master = getJavaProperties().getProperty("master");
-    if (master == null) {
-      master = getJavaProperties().getProperty("spark.master", "local[*]");
-    }
-    return master;
-  }
-
-  private String getDeployMode() {
-    String master = getSparkMaster();
-    if (master.equals("yarn-client")) {
-      return "client";
-    } else if (master.equals("yarn-cluster")) {
-      return "cluster";
-    } else if (master.startsWith("local")) {
-      return "client";
-    } else {
-      String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
-      if (deployMode == null) {
-        throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
-            "is not specified");
-      }
-      if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
-        throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
-      }
-      return deployMode;
-    }
-  }
-
-  private boolean isYarnMode() {
-    return getSparkMaster().startsWith("yarn");
-  }
-
-  private String toShellFormat(String value) {
-    if (value.contains("\'") && value.contains("\"")) {
-      throw new RuntimeException("Spark property value could not contain both \" and '");
-    } else if (value.contains("\'")) {
-      return "\"" + value + "\"";
-    } else {
-      return "\'" + value + "\'";
+  synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
+    if (launcher == null) {
+      createLauncher();
     }
+    InterpreterLaunchContext launchContext = new
+        InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);
+    RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
+    process.setRemoteInterpreterEventPoller(
+        new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
+    return process;
   }
 
   private List<Interpreter> getOrCreateSession(String user, String noteId) {
@@ -815,8 +694,7 @@ public class InterpreterSetting {
     return null;
   }
 
-  private ManagedInterpreterGroup createInterpreterGroup(String groupId)
-      throws InterpreterException {
+  private ManagedInterpreterGroup createInterpreterGroup(String groupId) {
     AngularObjectRegistry angularObjectRegistry;
     ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this);
     angularObjectRegistry =
@@ -938,7 +816,8 @@ public class InterpreterSetting {
           );
           newProperties.put(key, property);
         } else {
-          throw new RuntimeException("Can not convert this type of property: " + value.getClass());
+          throw new RuntimeException("Can not convert this type of property: " +
+              value.getClass());
         }
       }
       return newProperties;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 9dfce21..f34195d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -744,11 +744,12 @@ public class InterpreterSettingManager {
   }
 
   /**
-   * Change interpreter property and restart
+   * Change interpreter properties and restart
    */
   public void setPropertyAndRestart(String id, InterpreterOption option,
                                     Map<String, InterpreterProperty> properties,
-                                    List<Dependency> dependencies) throws IOException {
+                                    List<Dependency> dependencies)
+      throws InterpreterException, IOException {
     synchronized (interpreterSettings) {
       InterpreterSetting intpSetting = interpreterSettings.get(id);
       if (intpSetting != null) {
@@ -761,7 +762,7 @@ public class InterpreterSettingManager {
           saveToFile();
         } catch (Exception e) {
           loadFromFile();
-          throw e;
+          throw new IOException(e);
         }
       } else {
         throw new InterpreterException("Interpreter setting id " + id + " not found");
@@ -770,7 +771,7 @@ public class InterpreterSettingManager {
   }
 
   // restart in note page
-  public void restart(String settingId, String noteId, String user) {
+  public void restart(String settingId, String noteId, String user) throws InterpreterException {
     InterpreterSetting intpSetting = interpreterSettings.get(settingId);
     Preconditions.checkNotNull(intpSetting);
     synchronized (interpreterSettings) {
@@ -794,7 +795,7 @@ public class InterpreterSettingManager {
     }
   }
 
-  public void restart(String id) {
+  public void restart(String id) throws InterpreterException {
     restart(id, "", "anonymous");
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index 1d7d916..ff9cb1c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -25,6 +25,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
@@ -52,7 +53,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
     return interpreterSetting;
   }
 
-  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
     if (remoteInterpreterProcess == null) {
       LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId());
       remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
@@ -112,7 +113,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
         LOGGER.info("Job " + job.getJobName() + " aborted ");
       }
 
-      interpreter.close();
+      try {
+        interpreter.close();
+      } catch (InterpreterException e) {
+        LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
+      }
       //TODO(zjffdu) move the close of schedule to Interpreter
       if (null != scheduler) {
         SchedulerFactory.singleton().removeScheduler(scheduler.getName());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
new file mode 100644
index 0000000..f419967
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Interpreter Launcher which use shell script to launch the interpreter process.
+ *
+ */
+public class ShellScriptLauncher extends InterpreterLauncher {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
+
+  public ShellScriptLauncher(ZeppelinConfiguration zConf) {
+    super(zConf);
+  }
+
+  @Override
+  public InterpreterClient launch(InterpreterLaunchContext context) {
+    LOGGER.info("Launching Interpreter: " + context.getInterpreterGroupName());
+    this.properties = context.getProperties();
+    InterpreterOption option = context.getOption();
+    InterpreterRunner runner = context.getRunner();
+    String groupName = context.getInterpreterGroupName();
+
+    int connectTimeout =
+        zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    if (option.isExistingProcess()) {
+      return new RemoteInterpreterRunningProcess(
+          connectTimeout,
+          option.getHost(),
+          option.getPort());
+    } else {
+      // create new remote process
+      String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+          + context.getInterpreterGroupId();
+      return new RemoteInterpreterManagedProcess(
+          runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+          zConf.getCallbackPortRange(),
+          zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
+          buildEnvFromProperties(), connectTimeout, groupName);
+    }
+  }
+
+  protected Map<String, String> buildEnvFromProperties() {
+    Map<String, String> env = new HashMap<>();
+    for (Object key : properties.keySet()) {
+      if (RemoteInterpreterUtils.isEnvString((String) key)) {
+        env.put((String) key, properties.getProperty((String) key));
+      }
+    }
+    return env;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
new file mode 100644
index 0000000..32a0530
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Spark specific launcher.
+ */
+public class SparkInterpreterLauncher extends ShellScriptLauncher {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
+
+  public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
+    super(zConf);
+  }
+
+  @Override
+  protected Map<String, String> buildEnvFromProperties() {
+    Map<String, String> env = new HashMap<String, String>();
+    Properties sparkProperties = new Properties();
+    String sparkMaster = getSparkMaster(properties);
+    for (String key : properties.stringPropertyNames()) {
+      if (RemoteInterpreterUtils.isEnvString(key)) {
+        env.put(key, properties.getProperty(key));
+      }
+      if (isSparkConf(key, properties.getProperty(key))) {
+        sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key)));
+      }
+    }
+
+    setupPropertiesForPySpark(sparkProperties);
+    setupPropertiesForSparkR(sparkProperties);
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
+    }
+
+    StringBuilder sparkConfBuilder = new StringBuilder();
+    if (sparkMaster != null) {
+      sparkConfBuilder.append(" --master " + sparkMaster);
+    }
+    if (isYarnMode() && getDeployMode().equals("cluster")) {
+      sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties");
+    }
+    for (String name : sparkProperties.stringPropertyNames()) {
+      sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+    }
+
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+
+    // set these env in the order of
+    // 1. interpreter-setting
+    // 2. zeppelin-env.sh
+    // It is encouraged to set env in interpreter setting, but just for backward compatability,
+    // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
+    for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"})  {
+      String envValue = getEnv(envName);
+      if (envValue != null) {
+        env.put(envName, envValue);
+      }
+    }
+    LOGGER.debug("buildEnvFromProperties: " + env);
+    return env;
+
+  }
+
+
+  /**
+   * get environmental variable in the following order
+   *
+   * 1. interpreter setting
+   * 2. zeppelin-env.sh
+   *
+   */
+  private String getEnv(String envName) {
+    String env = properties.getProperty(envName);
+    if (env == null) {
+      env = System.getenv(envName);
+    }
+    return env;
+  }
+
+  private boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+  }
+
+  private void setupPropertiesForPySpark(Properties sparkProperties) {
+    if (isYarnMode()) {
+      sparkProperties.setProperty("spark.yarn.isPython", "true");
+    }
+  }
+
+  private void mergeSparkProperty(Properties sparkProperties, String propertyName,
+                                  String propertyValue) {
+    if (sparkProperties.containsKey(propertyName)) {
+      String oldPropertyValue = sparkProperties.getProperty(propertyName);
+      sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
+    } else {
+      sparkProperties.setProperty(propertyName, propertyValue);
+    }
+  }
+
+  private void setupPropertiesForSparkR(Properties sparkProperties) {
+    String sparkHome = getEnv("SPARK_HOME");
+    File sparkRBasePath = null;
+    if (sparkHome == null) {
+      if (!getSparkMaster(properties).startsWith("local")) {
+        throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
+            " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
+            " interpreter setting");
+      }
+      String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+      sparkRBasePath = new File(zeppelinHome,
+          "interpreter" + File.separator + "spark" + File.separator + "R");
+    } else {
+      sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+    }
+
+    File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+    if (sparkRPath.exists() && sparkRPath.isFile()) {
+      mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+    } else {
+      LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+    }
+  }
+
+  /**
+   * Order to look for spark master
+   * 1. master in interpreter setting
+   * 2. spark.master interpreter setting
+   * 3. use local[*]
+   * @param properties
+   * @return
+   */
+  private String getSparkMaster(Properties properties) {
+    String master = properties.getProperty("master");
+    if (master == null) {
+      master = properties.getProperty("spark.master");
+      if (master == null) {
+        master = "local[*]";
+      }
+    }
+    return master;
+  }
+
+  private String getDeployMode() {
+    String master = getSparkMaster(properties);
+    if (master.equals("yarn-client")) {
+      return "client";
+    } else if (master.equals("yarn-cluster")) {
+      return "cluster";
+    } else if (master.startsWith("local")) {
+      return "client";
+    } else {
+      String deployMode = properties.getProperty("spark.submit.deployMode");
+      if (deployMode == null) {
+        throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
+            "is not specified");
+      }
+      if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+        throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
+      }
+      return deployMode;
+    }
+  }
+
+  private boolean isYarnMode() {
+    return getSparkMaster(properties).startsWith("yarn");
+  }
+
+  private String toShellFormat(String value) {
+    if (value.contains("\'") && value.contains("\"")) {
+      throw new RuntimeException("Spark property value could not contain both \" and '");
+    } else if (value.contains("\'")) {
+      return "\"" + value + "\"";
+    } else {
+      return "\'" + value + "\'";
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
index 064abd5..7653824 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -82,7 +82,7 @@ public class InterpreterContextRunnerPool {
         }
       }
 
-      throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
+      throw new RuntimeException("Can not run paragraph " + paragraphId + " on " + noteId);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 54bf9e1..b479799 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.display.Input;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -42,6 +43,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -90,7 +92,7 @@ public class RemoteInterpreter extends Interpreter {
     return this.sessionId;
   }
 
-  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+  public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
     if (this.interpreterProcess != null) {
       return this.interpreterProcess;
     }
@@ -113,7 +115,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   @Override
-  public void open() {
+  public void open() throws InterpreterException {
     synchronized (this) {
       if (!isOpened) {
         // create all the interpreters of the same session first, then Open the internal interpreter
@@ -123,7 +125,11 @@ public class RemoteInterpreter extends Interpreter {
         // also see method Interpreter.getInterpreterInTheSameSessionByClassName
         for (Interpreter interpreter : getInterpreterGroup()
                                         .getOrCreateSession(userName, sessionId)) {
-          ((RemoteInterpreter) interpreter).internal_create();
+          try {
+            ((RemoteInterpreter) interpreter).internal_create();
+          } catch (IOException e) {
+            throw new InterpreterException(e);
+          }
         }
 
         interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@@ -147,7 +153,7 @@ public class RemoteInterpreter extends Interpreter {
     }
   }
 
-  private void internal_create() {
+  private void internal_create() throws IOException {
     synchronized (this) {
       if (!isCreated) {
         RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
@@ -156,7 +162,7 @@ public class RemoteInterpreter extends Interpreter {
           public Void call(Client client) throws Exception {
             LOGGER.info("Create RemoteInterpreter {}", getClassName());
             client.createInterpreter(getInterpreterGroup().getId(), sessionId,
-                className, (Map) property, userName);
+                className, (Map) getProperties(), userName);
             return null;
           }
         });
@@ -167,9 +173,14 @@ public class RemoteInterpreter extends Interpreter {
 
 
   @Override
-  public void close() {
+  public void close() throws InterpreterException {
     if (isOpened) {
-      RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+      RemoteInterpreterProcess interpreterProcess = null;
+      try {
+        interpreterProcess = getOrCreateInterpreterProcess();
+      } catch (IOException e) {
+        throw new InterpreterException(e);
+      }
       interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
         @Override
         public Void call(Client client) throws Exception {
@@ -184,13 +195,19 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   @Override
-  public InterpreterResult interpret(final String st, final InterpreterContext context) {
+  public InterpreterResult interpret(final String st, final InterpreterContext context)
+      throws InterpreterException {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("st:\n{}", st);
     }
 
     final FormType form = getFormType();
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
     InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
         .getInterpreterContextRunnerPool();
     List<InterpreterContextRunner> runners = context.getRunners();
@@ -238,12 +255,17 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   @Override
-  public void cancel(final InterpreterContext context) {
+  public void cancel(final InterpreterContext context) throws InterpreterException {
     if (!isOpened) {
       LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
       return;
     }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
     interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
       @Override
       public Void call(Client client) throws Exception {
@@ -254,7 +276,7 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   @Override
-  public FormType getFormType() {
+  public FormType getFormType() throws InterpreterException {
     if (formType != null) {
       return formType;
     }
@@ -265,7 +287,12 @@ public class RemoteInterpreter extends Interpreter {
         open();
       }
     }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
     FormType type = interpreterProcess.callRemoteFunction(
         new RemoteInterpreterProcess.RemoteFunction<FormType>() {
           @Override
@@ -277,13 +304,19 @@ public class RemoteInterpreter extends Interpreter {
     return type;
   }
 
+
   @Override
-  public int getProgress(final InterpreterContext context) {
+  public int getProgress(final InterpreterContext context) throws InterpreterException {
     if (!isOpened) {
       LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
       return 0;
     }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
     return interpreterProcess.callRemoteFunction(
         new RemoteInterpreterProcess.RemoteFunction<Integer>() {
           @Override
@@ -296,12 +329,18 @@ public class RemoteInterpreter extends Interpreter {
 
   @Override
   public List<InterpreterCompletion> completion(final String buf, final int cursor,
-                                                final InterpreterContext interpreterContext) {
+                                                final InterpreterContext interpreterContext)
+      throws InterpreterException {
     if (!isOpened) {
       LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
       return new ArrayList<>();
     }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new InterpreterException(e);
+    }
     return interpreterProcess.callRemoteFunction(
         new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
           @Override
@@ -317,7 +356,12 @@ public class RemoteInterpreter extends Interpreter {
       LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
       return Job.Status.UNKNOWN.name();
     }
-    RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+    RemoteInterpreterProcess interpreterProcess = null;
+    try {
+      interpreterProcess = getOrCreateInterpreterProcess();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
     return interpreterProcess.callRemoteFunction(
         new RemoteInterpreterProcess.RemoteFunction<String>() {
           @Override
@@ -331,7 +375,7 @@ public class RemoteInterpreter extends Interpreter {
   @Override
   public Scheduler getScheduler() {
     int maxConcurrency = Integer.parseInt(
-        property.getProperty("zeppelin.interpreter.max.poolsize",
+        getProperty("zeppelin.interpreter.max.poolsize",
             ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
 
     Scheduler s = new RemoteScheduler(

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index d21a962..6e26e58 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter.remote;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.ExecuteException;
@@ -73,11 +74,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
       String localRepoDir,
       Map<String, String> env,
       int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
       String interpreterGroupName) {
-    super(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
+    super(connectTimeout);
     this.interpreterRunner = intpRunner;
     this.portRange = portRange;
     this.env = env;
@@ -86,23 +84,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
     this.interpreterGroupName = interpreterGroupName;
   }
 
-  RemoteInterpreterManagedProcess(String intpRunner,
-                                  String intpDir,
-                                  String localRepoDir,
-                                  Map<String, String> env,
-                                  RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                                  int connectTimeout,
-                                  String interpreterGroupName) {
-    super(remoteInterpreterEventPoller,
-        connectTimeout);
-    this.interpreterRunner = intpRunner;
-    this.portRange = ":";
-    this.env = env;
-    this.interpreterDir = intpDir;
-    this.localRepoDir = localRepoDir;
-    this.interpreterGroupName = interpreterGroupName;
-  }
-
   @Override
   public String getHost() {
     return "localhost";
@@ -124,7 +105,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
       callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
       callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
     } catch (IOException e1) {
-      throw new InterpreterException(e1);
+      throw new RuntimeException(e1);
     }
 
     logger.info("Thrift server for callback will start. Port: {}", callbackPort);
@@ -206,7 +187,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
       executor.execute(cmdLine, procEnv, this);
     } catch (IOException e) {
       running.set(false);
-      throw new InterpreterException(e);
+      throw new RuntimeException(e);
     }
 
     try {
@@ -217,7 +198,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
       }
       if (!running.get()) {
         callbackServer.stop();
-        throw new InterpreterException("Cannot run interpreter");
+        throw new RuntimeException(new String(cmdOut.toByteArray()));
       }
     } catch (InterruptedException e) {
       logger.error("Remote interpreter is not accessible");
@@ -227,7 +208,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
 
   public void stop() {
     // shutdown EventPoller first.
-    this.remoteInterpreterEventPoller.shutdown();
+    this.getRemoteInterpreterEventPoller().shutdown();
     if (callbackServer.isServing()) {
       callbackServer.stop();
     }
@@ -266,6 +247,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
     running.set(false);
   }
 
+  @VisibleForTesting
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  @VisibleForTesting
+  public String getLocalRepoDir() {
+    return localRepoDir;
+  }
+
+  @VisibleForTesting
+  public String getInterpreterDir() {
+    return interpreterDir;
+  }
+
+  @VisibleForTesting
+  public String getInterpreterGroupName() {
+    return interpreterGroupName;
+  }
+
+  @VisibleForTesting
+  public String getInterpreterRunner() {
+    return interpreterRunner;
+  }
+
   public boolean isRunning() {
     return running.get();
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index e45f15b..88cc489 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -21,6 +21,7 @@ import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,27 +29,17 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract class for interpreter process
  */
-public abstract class RemoteInterpreterProcess {
+public abstract class RemoteInterpreterProcess implements InterpreterClient {
   private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
 
   private GenericObjectPool<Client> clientPool;
-  protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+  private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
   private final InterpreterContextRunnerPool interpreterContextRunnerPool;
   private int connectTimeout;
 
   public RemoteInterpreterProcess(
-      int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener) {
-    this(new RemoteInterpreterEventPoller(listener, appListener),
-        connectTimeout);
-    this.remoteInterpreterEventPoller.setInterpreterProcess(this);
-  }
-
-  RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
-                           int connectTimeout) {
+      int connectTimeout) {
     this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
-    this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
     this.connectTimeout = connectTimeout;
   }
 
@@ -56,6 +47,10 @@ public abstract class RemoteInterpreterProcess {
     return remoteInterpreterEventPoller;
   }
 
+  public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPoller) {
+    this.remoteInterpreterEventPoller = eventPoller;
+  }
+
   public abstract String getHost();
   public abstract int getPort();
   public abstract void start(String userName, Boolean isUserImpersonate);
@@ -147,9 +142,9 @@ public abstract class RemoteInterpreterProcess {
       }
     } catch (TException e) {
       broken = true;
-      throw new InterpreterException(e);
+      throw new RuntimeException(e);
     } catch (Exception e1) {
-      throw new InterpreterException(e1);
+      throw new RuntimeException(e1);
     } finally {
       if (client != null) {
         releaseClient(client, broken);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index bb176be..d8715a0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -30,12 +30,10 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
 
   public RemoteInterpreterRunningProcess(
       int connectTimeout,
-      RemoteInterpreterProcessListener listener,
-      ApplicationEventListener appListener,
       String host,
       int port
   ) {
-    super(connectTimeout, listener, appListener);
+    super(connectTimeout);
     this.host = host;
     this.port = port;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 03c5046..b5dda67 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -627,7 +627,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
     if (intp == null) {
       String intpExceptionMsg =
           p.getJobName() + "'s Interpreter " + requiredReplName + " not found";
-      InterpreterException intpException = new InterpreterException(intpExceptionMsg);
+      RuntimeException intpException = new RuntimeException(intpExceptionMsg);
       InterpreterResult intpResult =
           new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage());
       p.setReturn(intpResult, intpException);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 4652fcd..77fd04c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -892,7 +892,11 @@ public class Notebook implements NoteEventListener {
       if (releaseResource) {
         for (InterpreterSetting setting : notebook.getInterpreterSettingManager()
             .getInterpreterSettings(note.getId())) {
-          notebook.getInterpreterSettingManager().restart(setting.getId());
+          try {
+            notebook.getInterpreterSettingManager().restart(setting.getId());
+          } catch (InterpreterException e) {
+            logger.error("Fail to restart interpreter: " + setting.getId(), e);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 68ce794..701943a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -312,15 +312,14 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     String replName = getRequiredReplName(trimmedBuffer);
 
     String body = getScriptBody(trimmedBuffer);
-    Interpreter repl = getRepl(replName);
-    if (repl == null) {
-      return null;
-    }
-
     InterpreterContext interpreterContext = getInterpreterContextWithoutRunner(null);
 
-    List completion = repl.completion(body, cursor, interpreterContext);
-    return completion;
+    try {
+      Interpreter repl = getRepl(replName);
+      return repl.completion(body, cursor, interpreterContext);
+    } catch (InterpreterException e) {
+      throw new RuntimeException("Fail to get completion", e);
+    }
   }
 
   public int calculateCursorPosition(String buffer, String trimmedBuffer, int cursor) {
@@ -362,11 +361,15 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   @Override
   public int progress() {
     String replName = getRequiredReplName();
-    Interpreter repl = getRepl(replName);
-    if (repl != null) {
+
+    try {
+      Interpreter repl = getRepl(replName);
+      if (repl == null) {
+        return 0;
+      }
       return repl.getProgress(getInterpreterContext(null));
-    } else {
-      return 0;
+    } catch (InterpreterException e) {
+      throw new RuntimeException("Fail to get progress", e);
     }
   }
 
@@ -494,10 +497,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   protected boolean jobAbort() {
     Interpreter repl = getRepl(getRequiredReplName());
     if (repl == null) {
-      // when interpreters are already destroyed
       return true;
     }
-
     Scheduler scheduler = repl.getScheduler();
     if (scheduler == null) {
       return true;
@@ -507,7 +508,11 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
     if (job != null) {
       job.setStatus(Status.ABORT);
     } else {
-      repl.cancel(getInterpreterContextWithoutRunner(null));
+      try {
+        repl.cancel(getInterpreterContextWithoutRunner(null));
+      } catch (InterpreterException e) {
+        throw new RuntimeException(e);
+      }
     }
     return true;
   }
@@ -738,12 +743,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
   }
 
   private boolean isValidInterpreter(String replName) {
-    try {
-      return factory.getInterpreter(user, note.getId(), replName) != null;
-    } catch (InterpreterException e) {
-      // ignore this exception, it would be recaught when running paragraph.
-      return false;
-    }
+    return factory.getInterpreter(user, note.getId(), replName) != null;
   }
 
   public void updateRuntimeInfos(String label, String tooltip, Map<String, String> infos,

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index 8f3e615..f23d433 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.helium;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
 import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.notebook.ApplicationState;
@@ -241,7 +242,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
 
 
   @Test
-  public void testUnloadOnInterpreterRestart() throws IOException {
+  public void testUnloadOnInterpreterRestart() throws IOException, InterpreterException {
     // given
     HeliumPackage pkg1 = new HeliumPackage(HeliumType.APPLICATION,
         "name1",