You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/10/14 05:46:16 UTC
[2/4] zeppelin git commit: ZEPPELIN-2685. Improvement on Interpreter
class
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
index c1dba5c..e2a10e6 100644
--- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
+++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java
@@ -123,7 +123,7 @@ public class InterpreterRestApi {
request.getOption(), request.getProperties());
logger.info("new setting created with {}", interpreterSetting.getId());
return new JsonResponse<>(Status.OK, "", interpreterSetting).build();
- } catch (InterpreterException | IOException e) {
+ } catch (IOException e) {
logger.error("Exception in InterpreterRestApi while creating ", e);
return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
.build();
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 3e46449..2fa584b 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
@@ -171,7 +172,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
@Test
- public void sparkRTest() throws IOException {
+ public void sparkRTest() throws IOException, InterpreterException {
// create new note
Note note = ZeppelinServer.notebook.createNote(anonymous);
int sparkVersion = getSparkVersionNumber(note);
@@ -426,7 +427,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
@Test
- public void pySparkDepLoaderTest() throws IOException {
+ public void pySparkDepLoaderTest() throws IOException, InterpreterException {
// create new note
Note note = ZeppelinServer.notebook.createNote(anonymous);
int sparkVersionNumber = getSparkVersionNumber(note);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index c67df6b..d1a2270 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -71,11 +71,6 @@
</dependency>
<dependency>
- <groupId>commons-configuration</groupId>
- <artifactId>commons-configuration</artifactId>
- </dependency>
-
- <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
deleted file mode 100644
index 3a82bc5..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ /dev/null
@@ -1,847 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.conf;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.commons.configuration.tree.ConfigurationNode;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zeppelin.util.Util;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Zeppelin configuration.
- *
- */
-public class ZeppelinConfiguration extends XMLConfiguration {
- private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
- private static final long serialVersionUID = 4749305895693848035L;
- private static final Logger LOG = LoggerFactory.getLogger(ZeppelinConfiguration.class);
-
- private static final String HELIUM_PACKAGE_DEFAULT_URL =
- "https://s3.amazonaws.com/helium-package/helium.json";
- private static ZeppelinConfiguration conf;
-
- public ZeppelinConfiguration(URL url) throws ConfigurationException {
- setDelimiterParsingDisabled(true);
- load(url);
- }
-
- public ZeppelinConfiguration() {
- ConfVars[] vars = ConfVars.values();
- for (ConfVars v : vars) {
- if (v.getType() == ConfVars.VarType.BOOLEAN) {
- this.setProperty(v.getVarName(), v.getBooleanValue());
- } else if (v.getType() == ConfVars.VarType.LONG) {
- this.setProperty(v.getVarName(), v.getLongValue());
- } else if (v.getType() == ConfVars.VarType.INT) {
- this.setProperty(v.getVarName(), v.getIntValue());
- } else if (v.getType() == ConfVars.VarType.FLOAT) {
- this.setProperty(v.getVarName(), v.getFloatValue());
- } else if (v.getType() == ConfVars.VarType.STRING) {
- this.setProperty(v.getVarName(), v.getStringValue());
- } else {
- throw new RuntimeException("Unsupported VarType");
- }
- }
-
- }
-
-
- /**
- * Load from resource.
- *url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
- * @throws ConfigurationException
- */
- public static synchronized ZeppelinConfiguration create() {
- if (conf != null) {
- return conf;
- }
-
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- URL url;
-
- url = ZeppelinConfiguration.class.getResource(ZEPPELIN_SITE_XML);
- if (url == null) {
- ClassLoader cl = ZeppelinConfiguration.class.getClassLoader();
- if (cl != null) {
- url = cl.getResource(ZEPPELIN_SITE_XML);
- }
- }
- if (url == null) {
- url = classLoader.getResource(ZEPPELIN_SITE_XML);
- }
-
- if (url == null) {
- LOG.warn("Failed to load configuration, proceeding with a default");
- conf = new ZeppelinConfiguration();
- } else {
- try {
- LOG.info("Load configuration from " + url);
- conf = new ZeppelinConfiguration(url);
- } catch (ConfigurationException e) {
- LOG.warn("Failed to load configuration from " + url + " proceeding with a default", e);
- conf = new ZeppelinConfiguration();
- }
- }
-
- LOG.info("Server Host: " + conf.getServerAddress());
- if (conf.useSsl() == false) {
- LOG.info("Server Port: " + conf.getServerPort());
- } else {
- LOG.info("Server SSL Port: " + conf.getServerSslPort());
- }
- LOG.info("Context Path: " + conf.getServerContextPath());
- LOG.info("Zeppelin Version: " + Util.getVersion());
-
- return conf;
- }
-
-
- private String getStringValue(String name, String d) {
- List<ConfigurationNode> properties = getRootNode().getChildren();
- if (properties == null || properties.isEmpty()) {
- return d;
- }
- for (ConfigurationNode p : properties) {
- if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
- && name.equals(p.getChildren("name").get(0).getValue())) {
- return (String) p.getChildren("value").get(0).getValue();
- }
- }
- return d;
- }
-
- private int getIntValue(String name, int d) {
- List<ConfigurationNode> properties = getRootNode().getChildren();
- if (properties == null || properties.isEmpty()) {
- return d;
- }
- for (ConfigurationNode p : properties) {
- if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
- && name.equals(p.getChildren("name").get(0).getValue())) {
- return Integer.parseInt((String) p.getChildren("value").get(0).getValue());
- }
- }
- return d;
- }
-
- private long getLongValue(String name, long d) {
- List<ConfigurationNode> properties = getRootNode().getChildren();
- if (properties == null || properties.isEmpty()) {
- return d;
- }
- for (ConfigurationNode p : properties) {
- if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
- && name.equals(p.getChildren("name").get(0).getValue())) {
- return Long.parseLong((String) p.getChildren("value").get(0).getValue());
- }
- }
- return d;
- }
-
- private float getFloatValue(String name, float d) {
- List<ConfigurationNode> properties = getRootNode().getChildren();
- if (properties == null || properties.isEmpty()) {
- return d;
- }
- for (ConfigurationNode p : properties) {
- if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
- && name.equals(p.getChildren("name").get(0).getValue())) {
- return Float.parseFloat((String) p.getChildren("value").get(0).getValue());
- }
- }
- return d;
- }
-
- private boolean getBooleanValue(String name, boolean d) {
- List<ConfigurationNode> properties = getRootNode().getChildren();
- if (properties == null || properties.isEmpty()) {
- return d;
- }
- for (ConfigurationNode p : properties) {
- if (p.getChildren("name") != null && !p.getChildren("name").isEmpty()
- && name.equals(p.getChildren("name").get(0).getValue())) {
- return Boolean.parseBoolean((String) p.getChildren("value").get(0).getValue());
- }
- }
- return d;
- }
-
- public String getString(ConfVars c) {
- return getString(c.name(), c.getVarName(), c.getStringValue());
- }
-
- public String getString(String envName, String propertyName, String defaultValue) {
- if (System.getenv(envName) != null) {
- return System.getenv(envName);
- }
- if (System.getProperty(propertyName) != null) {
- return System.getProperty(propertyName);
- }
-
- return getStringValue(propertyName, defaultValue);
- }
-
- public int getInt(ConfVars c) {
- return getInt(c.name(), c.getVarName(), c.getIntValue());
- }
-
- public int getInt(String envName, String propertyName, int defaultValue) {
- if (System.getenv(envName) != null) {
- return Integer.parseInt(System.getenv(envName));
- }
-
- if (System.getProperty(propertyName) != null) {
- return Integer.parseInt(System.getProperty(propertyName));
- }
- return getIntValue(propertyName, defaultValue);
- }
-
- public long getLong(ConfVars c) {
- return getLong(c.name(), c.getVarName(), c.getLongValue());
- }
-
- public long getLong(String envName, String propertyName, long defaultValue) {
- if (System.getenv(envName) != null) {
- return Long.parseLong(System.getenv(envName));
- }
-
- if (System.getProperty(propertyName) != null) {
- return Long.parseLong(System.getProperty(propertyName));
- }
- return getLongValue(propertyName, defaultValue);
- }
-
- public float getFloat(ConfVars c) {
- return getFloat(c.name(), c.getVarName(), c.getFloatValue());
- }
-
- public float getFloat(String envName, String propertyName, float defaultValue) {
- if (System.getenv(envName) != null) {
- return Float.parseFloat(System.getenv(envName));
- }
- if (System.getProperty(propertyName) != null) {
- return Float.parseFloat(System.getProperty(propertyName));
- }
- return getFloatValue(propertyName, defaultValue);
- }
-
- public boolean getBoolean(ConfVars c) {
- return getBoolean(c.name(), c.getVarName(), c.getBooleanValue());
- }
-
- public boolean getBoolean(String envName, String propertyName, boolean defaultValue) {
- if (System.getenv(envName) != null) {
- return Boolean.parseBoolean(System.getenv(envName));
- }
-
- if (System.getProperty(propertyName) != null) {
- return Boolean.parseBoolean(System.getProperty(propertyName));
- }
- return getBooleanValue(propertyName, defaultValue);
- }
-
- public boolean useSsl() {
- return getBoolean(ConfVars.ZEPPELIN_SSL);
- }
-
- public int getServerSslPort() {
- return getInt(ConfVars.ZEPPELIN_SSL_PORT);
- }
-
- public boolean useClientAuth() {
- return getBoolean(ConfVars.ZEPPELIN_SSL_CLIENT_AUTH);
- }
-
- public String getServerAddress() {
- return getString(ConfVars.ZEPPELIN_ADDR);
- }
-
- public int getServerPort() {
- return getInt(ConfVars.ZEPPELIN_PORT);
- }
-
- public String getServerContextPath() {
- return getString(ConfVars.ZEPPELIN_SERVER_CONTEXT_PATH);
- }
-
- public String getKeyStorePath() {
- String path = getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PATH);
- if (path != null && path.startsWith("/") || isWindowsPath(path)) {
- return path;
- } else {
- return getRelativeDir(
- String.format("%s/%s",
- getConfDir(),
- path));
- }
- }
-
- public String getKeyStoreType() {
- return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_TYPE);
- }
-
- public String getKeyStorePassword() {
- return getString(ConfVars.ZEPPELIN_SSL_KEYSTORE_PASSWORD);
- }
-
- public String getKeyManagerPassword() {
- String password = getString(ConfVars.ZEPPELIN_SSL_KEY_MANAGER_PASSWORD);
- if (password == null) {
- return getKeyStorePassword();
- } else {
- return password;
- }
- }
-
- public String getTrustStorePath() {
- String path = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PATH);
- if (path == null) {
- path = getKeyStorePath();
- }
- if (path != null && path.startsWith("/") || isWindowsPath(path)) {
- return path;
- } else {
- return getRelativeDir(
- String.format("%s/%s",
- getConfDir(),
- path));
- }
- }
-
- public String getTrustStoreType() {
- String type = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_TYPE);
- if (type == null) {
- return getKeyStoreType();
- } else {
- return type;
- }
- }
-
- public String getTrustStorePassword() {
- String password = getString(ConfVars.ZEPPELIN_SSL_TRUSTSTORE_PASSWORD);
- if (password == null) {
- return getKeyStorePassword();
- } else {
- return password;
- }
- }
-
- public String getNotebookDir() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
- }
-
- public String getUser() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
- }
-
- public String getBucketName() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_BUCKET);
- }
-
- public String getEndpoint() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_ENDPOINT);
- }
-
- public String getS3KMSKeyID() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID);
- }
-
- public String getS3KMSKeyRegion() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION);
- }
-
- public String getS3EncryptionMaterialsProviderClass() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_EMP);
- }
-
- public boolean isS3ServerSideEncryption() {
- return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_S3_SSE);
- }
-
- public String getMongoUri() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI);
- }
-
- public String getMongoDatabase() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_DATABASE);
- }
-
- public String getMongoCollection() {
- return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_COLLECTION);
- }
-
- public boolean getMongoAutoimport() {
- return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT);
- }
-
- public String getInterpreterListPath() {
- return getRelativeDir(String.format("%s/interpreter-list", getConfDir()));
- }
-
- public String getInterpreterDir() {
- return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_DIR);
- }
-
- public String getInterpreterJson() {
- return getString(ConfVars.ZEPPELIN_INTERPRETER_JSON);
- }
-
- public String getInterpreterSettingPath() {
- return getRelativeDir(String.format("%s/interpreter.json", getConfDir()));
- }
-
- public String getHeliumConfPath() {
- return getRelativeDir(String.format("%s/helium.json", getConfDir()));
- }
-
- public String getHeliumRegistry() {
- return getRelativeDir(ConfVars.ZEPPELIN_HELIUM_REGISTRY);
- }
-
- public String getHeliumNodeInstallerUrl() {
- return getString(ConfVars.ZEPPELIN_HELIUM_NODE_INSTALLER_URL);
- }
-
- public String getHeliumNpmInstallerUrl() {
- return getString(ConfVars.ZEPPELIN_HELIUM_NPM_INSTALLER_URL);
- }
-
- public String getHeliumYarnInstallerUrl() {
- return getString(ConfVars.ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL);
- }
-
- public String getNotebookAuthorizationPath() {
- return getRelativeDir(String.format("%s/notebook-authorization.json", getConfDir()));
- }
-
- public Boolean credentialsPersist() {
- return getBoolean(ConfVars.ZEPPELIN_CREDENTIALS_PERSIST);
- }
-
- public String getCredentialsEncryptKey() {
- return getString(ConfVars.ZEPPELIN_CREDENTIALS_ENCRYPT_KEY);
- }
-
- public String getCredentialsPath() {
- return getRelativeDir(String.format("%s/credentials.json", getConfDir()));
- }
-
- public String getShiroPath() {
- String shiroPath = getRelativeDir(String.format("%s/shiro.ini", getConfDir()));
- return new File(shiroPath).exists() ? shiroPath : StringUtils.EMPTY;
- }
-
- public String getInterpreterRemoteRunnerPath() {
- return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
- }
-
- public String getInterpreterLocalRepoPath() {
- return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO);
- }
-
- public String getInterpreterMvnRepoPath() {
- return getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO);
- }
-
- public String getRelativeDir(ConfVars c) {
- return getRelativeDir(getString(c));
- }
-
- public String getRelativeDir(String path) {
- if (path != null && path.startsWith("/") || isWindowsPath(path)) {
- return path;
- } else {
- return getString(ConfVars.ZEPPELIN_HOME) + "/" + path;
- }
- }
-
- public String getCallbackPortRange() {
- return getString(ConfVars.ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE);
- }
-
- public boolean isWindowsPath(String path){
- return path.matches("^[A-Za-z]:\\\\.*");
- }
-
- public boolean isAnonymousAllowed() {
- return getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED);
- }
-
- public boolean isNotebokPublic() {
- return getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC);
- }
-
- public String getConfDir() {
- return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
- }
-
- public List<String> getAllowedOrigins()
- {
- if (getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).isEmpty()) {
- return Arrays.asList(new String[0]);
- }
-
- return Arrays.asList(getString(ConfVars.ZEPPELIN_ALLOWED_ORIGINS).toLowerCase().split(","));
- }
-
- public String getWebsocketMaxTextMessageSize() {
- return getString(ConfVars.ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE);
- }
-
- public String getJettyName() {
- return getString(ConfVars.ZEPPELIN_SERVER_JETTY_NAME);
- }
-
-
- public String getXFrameOptions() {
- return getString(ConfVars.ZEPPELIN_SERVER_XFRAME_OPTIONS);
- }
-
- public String getXxssProtection() {
- return getString(ConfVars.ZEPPELIN_SERVER_X_XSS_PROTECTION);
- }
-
- public String getStrictTransport() {
- return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT);
- }
-
-
- public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf,
- ConfigurationKeyPredicate predicate) {
- Map<String, String> configurations = new HashMap<>();
-
- for (ConfVars v : ConfVars.values()) {
- String key = v.getVarName();
-
- if (!predicate.apply(key)) {
- continue;
- }
-
- ConfVars.VarType type = v.getType();
- Object value = null;
- if (type == ConfVars.VarType.BOOLEAN) {
- value = conf.getBoolean(v);
- } else if (type == ConfVars.VarType.LONG) {
- value = conf.getLong(v);
- } else if (type == ConfVars.VarType.INT) {
- value = conf.getInt(v);
- } else if (type == ConfVars.VarType.FLOAT) {
- value = conf.getFloat(v);
- } else if (type == ConfVars.VarType.STRING) {
- value = conf.getString(v);
- }
-
- if (value != null) {
- configurations.put(key, value.toString());
- }
- }
- return configurations;
- }
-
- /**
- * Predication whether key/value pair should be included or not
- */
- public interface ConfigurationKeyPredicate {
- boolean apply(String key);
- }
-
- /**
- * Wrapper class.
- */
- public static enum ConfVars {
- ZEPPELIN_HOME("zeppelin.home", "./"),
- ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"),
- ZEPPELIN_PORT("zeppelin.server.port", 8080),
- ZEPPELIN_SERVER_CONTEXT_PATH("zeppelin.server.context.path", "/"),
- ZEPPELIN_SSL("zeppelin.ssl", false),
- ZEPPELIN_SSL_PORT("zeppelin.server.ssl.port", 8443),
- ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false),
- ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),
- ZEPPELIN_SSL_KEYSTORE_TYPE("zeppelin.ssl.keystore.type", "JKS"),
- ZEPPELIN_SSL_KEYSTORE_PASSWORD("zeppelin.ssl.keystore.password", ""),
- ZEPPELIN_SSL_KEY_MANAGER_PASSWORD("zeppelin.ssl.key.manager.password", null),
- ZEPPELIN_SSL_TRUSTSTORE_PATH("zeppelin.ssl.truststore.path", null),
- ZEPPELIN_SSL_TRUSTSTORE_TYPE("zeppelin.ssl.truststore.type", null),
- ZEPPELIN_SSL_TRUSTSTORE_PASSWORD("zeppelin.ssl.truststore.password", null),
- ZEPPELIN_WAR("zeppelin.war", "zeppelin-web/dist"),
- ZEPPELIN_WAR_TEMPDIR("zeppelin.war.tempdir", "webapps"),
- ZEPPELIN_INTERPRETERS("zeppelin.interpreters", "org.apache.zeppelin.spark.SparkInterpreter,"
- + "org.apache.zeppelin.spark.PySparkInterpreter,"
- + "org.apache.zeppelin.rinterpreter.RRepl,"
- + "org.apache.zeppelin.rinterpreter.KnitR,"
- + "org.apache.zeppelin.spark.SparkRInterpreter,"
- + "org.apache.zeppelin.spark.SparkSqlInterpreter,"
- + "org.apache.zeppelin.spark.DepInterpreter,"
- + "org.apache.zeppelin.markdown.Markdown,"
- + "org.apache.zeppelin.angular.AngularInterpreter,"
- + "org.apache.zeppelin.shell.ShellInterpreter,"
- + "org.apache.zeppelin.livy.LivySparkInterpreter,"
- + "org.apache.zeppelin.livy.LivySparkSQLInterpreter,"
- + "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
- + "org.apache.zeppelin.livy.LivyPySpark3Interpreter,"
- + "org.apache.zeppelin.livy.LivySparkRInterpreter,"
- + "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
- + "org.apache.zeppelin.file.HDFSFileInterpreter,"
- + "org.apache.zeppelin.pig.PigInterpreter,"
- + "org.apache.zeppelin.pig.PigQueryInterpreter,"
- + "org.apache.zeppelin.flink.FlinkInterpreter,"
- + "org.apache.zeppelin.python.PythonInterpreter,"
- + "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
- + "org.apache.zeppelin.python.PythonCondaInterpreter,"
- + "org.apache.zeppelin.python.PythonDockerInterpreter,"
- + "org.apache.zeppelin.ignite.IgniteInterpreter,"
- + "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
- + "org.apache.zeppelin.lens.LensInterpreter,"
- + "org.apache.zeppelin.cassandra.CassandraInterpreter,"
- + "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
- + "org.apache.zeppelin.kylin.KylinInterpreter,"
- + "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
- + "org.apache.zeppelin.scalding.ScaldingInterpreter,"
- + "org.apache.zeppelin.jdbc.JDBCInterpreter,"
- + "org.apache.zeppelin.hbase.HbaseInterpreter,"
- + "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
- + "org.apache.zeppelin.beam.BeamInterpreter,"
- + "org.apache.zeppelin.scio.ScioInterpreter,"
- + "org.apache.zeppelin.groovy.GroovyInterpreter,"
- + "org.apache.zeppelin.neo4j.Neo4jCypherInterpreter"
- ),
- ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
- ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
- ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
- ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo",
- "http://repo1.maven.org/maven2/"),
- ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
- ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
- ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
- + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
- + "scalding,jdbc,hbase,bigquery,beam,pig,scio,groovy,neo4j"),
- ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
- ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
- ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
- // use specified notebook (id) as homescreen
- ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
- // whether homescreen notebook will be hidden from notebook list or not
- ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE("zeppelin.notebook.homescreen.hide", false),
- ZEPPELIN_NOTEBOOK_S3_BUCKET("zeppelin.notebook.s3.bucket", "zeppelin"),
- ZEPPELIN_NOTEBOOK_S3_ENDPOINT("zeppelin.notebook.s3.endpoint", "s3.amazonaws.com"),
- ZEPPELIN_NOTEBOOK_S3_USER("zeppelin.notebook.s3.user", "user"),
- ZEPPELIN_NOTEBOOK_S3_EMP("zeppelin.notebook.s3.encryptionMaterialsProvider", null),
- ZEPPELIN_NOTEBOOK_S3_KMS_KEY_ID("zeppelin.notebook.s3.kmsKeyID", null),
- ZEPPELIN_NOTEBOOK_S3_KMS_KEY_REGION("zeppelin.notebook.s3.kmsKeyRegion", null),
- ZEPPELIN_NOTEBOOK_S3_SSE("zeppelin.notebook.s3.sse", false),
- ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null),
- ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"),
- ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"),
- ZEPPELIN_NOTEBOOK_MONGO_DATABASE("zeppelin.notebook.mongo.database", "zeppelin"),
- ZEPPELIN_NOTEBOOK_MONGO_COLLECTION("zeppelin.notebook.mongo.collection", "notes"),
- ZEPPELIN_NOTEBOOK_MONGO_URI("zeppelin.notebook.mongo.uri", "mongodb://localhost"),
- ZEPPELIN_NOTEBOOK_MONGO_AUTOIMPORT("zeppelin.notebook.mongo.autoimport", false),
- ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage",
- "org.apache.zeppelin.notebook.repo.GitNotebookRepo"),
- ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false),
- // whether by default note is public or private
- ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true),
- ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner",
- System.getProperty("os.name")
- .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"),
- // Decide when new note is created, interpreter settings will be binded automatically or not.
- ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),
- ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"),
- ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"),
- ZEPPELIN_HELIUM_REGISTRY("zeppelin.helium.registry", "helium," + HELIUM_PACKAGE_DEFAULT_URL),
- ZEPPELIN_HELIUM_NODE_INSTALLER_URL("zeppelin.helium.node.installer.url",
- "https://nodejs.org/dist/"),
- ZEPPELIN_HELIUM_NPM_INSTALLER_URL("zeppelin.helium.npm.installer.url",
- "http://registry.npmjs.org/"),
- ZEPPELIN_HELIUM_YARNPKG_INSTALLER_URL("zeppelin.helium.yarnpkg.installer.url",
- "https://github.com/yarnpkg/yarn/releases/download/"),
- // Allows a way to specify a ',' separated list of allowed origins for rest and websockets
- // i.e. http://localhost:8080
- ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),
- ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true),
- ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true),
- ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null),
- ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"),
- ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false),
- ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"),
- ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", null),
- ZEPPELIN_SERVER_STRICT_TRANSPORT("zeppelin.server.strict.transport", "max-age=631138519"),
- ZEPPELIN_SERVER_X_XSS_PROTECTION("zeppelin.server.xxss.protection", "1"),
-
- ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""),
- ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""),
-
- ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":");
-
- private String varName;
- @SuppressWarnings("rawtypes")
- private Class varClass;
- private String stringValue;
- private VarType type;
- private int intValue;
- private float floatValue;
- private boolean booleanValue;
- private long longValue;
-
-
- ConfVars(String varName, String varValue) {
- this.varName = varName;
- this.varClass = String.class;
- this.stringValue = varValue;
- this.intValue = -1;
- this.floatValue = -1;
- this.longValue = -1;
- this.booleanValue = false;
- this.type = VarType.STRING;
- }
-
- ConfVars(String varName, int intValue) {
- this.varName = varName;
- this.varClass = Integer.class;
- this.stringValue = null;
- this.intValue = intValue;
- this.floatValue = -1;
- this.longValue = -1;
- this.booleanValue = false;
- this.type = VarType.INT;
- }
-
- ConfVars(String varName, long longValue) {
- this.varName = varName;
- this.varClass = Integer.class;
- this.stringValue = null;
- this.intValue = -1;
- this.floatValue = -1;
- this.longValue = longValue;
- this.booleanValue = false;
- this.type = VarType.LONG;
- }
-
- ConfVars(String varName, float floatValue) {
- this.varName = varName;
- this.varClass = Float.class;
- this.stringValue = null;
- this.intValue = -1;
- this.longValue = -1;
- this.floatValue = floatValue;
- this.booleanValue = false;
- this.type = VarType.FLOAT;
- }
-
- ConfVars(String varName, boolean booleanValue) {
- this.varName = varName;
- this.varClass = Boolean.class;
- this.stringValue = null;
- this.intValue = -1;
- this.longValue = -1;
- this.floatValue = -1;
- this.booleanValue = booleanValue;
- this.type = VarType.BOOLEAN;
- }
-
- public String getVarName() {
- return varName;
- }
-
- @SuppressWarnings("rawtypes")
- public Class getVarClass() {
- return varClass;
- }
-
- public int getIntValue() {
- return intValue;
- }
-
- public long getLongValue() {
- return longValue;
- }
-
- public float getFloatValue() {
- return floatValue;
- }
-
- public String getStringValue() {
- return stringValue;
- }
-
- public boolean getBooleanValue() {
- return booleanValue;
- }
-
- public VarType getType() {
- return type;
- }
-
- enum VarType {
- STRING {
- @Override
- void checkType(String value) throws Exception {}
- },
- INT {
- @Override
- void checkType(String value) throws Exception {
- Integer.valueOf(value);
- }
- },
- LONG {
- @Override
- void checkType(String value) throws Exception {
- Long.valueOf(value);
- }
- },
- FLOAT {
- @Override
- void checkType(String value) throws Exception {
- Float.valueOf(value);
- }
- },
- BOOLEAN {
- @Override
- void checkType(String value) throws Exception {
- Boolean.valueOf(value);
- }
- };
-
- boolean isType(String value) {
- try {
- checkType(value);
- } catch (Exception e) {
- LOG.error("Exception in ZeppelinConfiguration while isType", e);
- return false;
- }
- return true;
- }
-
- String typeString() {
- return name().toUpperCase();
- }
-
- abstract void checkType(String value) throws Exception;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index f020919..7233239 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -77,7 +77,7 @@ public class InterpreterFactory {
return interpreter;
}
}
- throw new InterpreterException(replName + " interpreter not found");
+ return null;
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 5af01dc..a82d5bf 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -18,7 +18,6 @@
package org.apache.zeppelin.interpreter;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
@@ -34,19 +33,22 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
+import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
+import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FilenameFilter;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
@@ -58,7 +60,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -132,6 +133,10 @@ public class InterpreterSetting {
private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
+ // TODO(zjffdu) ShellScriptLauncher is the only launcher implemention for now. It could be other
+ // launcher in future when we have other launcher implementation. e.g. third party launcher
+ // service like livy
+ private transient InterpreterLauncher launcher;
///////////////////////////////////////////////////////////////////////////////////////////
@@ -243,6 +248,7 @@ public class InterpreterSetting {
}
void postProcessing() {
+// createLauncher();
this.status = Status.READY;
}
@@ -266,6 +272,14 @@ public class InterpreterSetting {
this.conf = o.getConf();
}
+ private void createLauncher() {
+ if (group.equals("spark")) {
+ this.launcher = new SparkInterpreterLauncher(this.conf);
+ } else {
+ this.launcher = new ShellScriptLauncher(this.conf);
+ }
+ }
+
public AngularObjectRegistryListener getAngularObjectRegistryListener() {
return angularObjectRegistryListener;
}
@@ -626,152 +640,17 @@ public class InterpreterSetting {
}
return interpreters;
}
-
- RemoteInterpreterProcess createInterpreterProcess() {
- RemoteInterpreterProcess remoteInterpreterProcess = null;
- int connectTimeout =
- conf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
- String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + id;
- if (option.isExistingProcess()) {
- // TODO(zjffdu) remove the existing process approach seems no one is using this.
- // use the existing process
- remoteInterpreterProcess = new RemoteInterpreterRunningProcess(
- connectTimeout,
- remoteInterpreterProcessListener,
- appEventListener,
- option.getHost(),
- option.getPort());
- } else {
- // create new remote process
- remoteInterpreterProcess = new RemoteInterpreterManagedProcess(
- interpreterRunner != null ? interpreterRunner.getPath() :
- conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
- interpreterDir, localRepoPath,
- getEnvFromInterpreterProperty(), connectTimeout,
- remoteInterpreterProcessListener, appEventListener, group);
- }
- return remoteInterpreterProcess;
- }
-
- private boolean isSparkConf(String key, String value) {
- return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
- }
-
- private Map<String, String> getEnvFromInterpreterProperty() {
- Map<String, String> env = new HashMap<String, String>();
- Properties javaProperties = getJavaProperties();
- Properties sparkProperties = new Properties();
- String sparkMaster = getSparkMaster();
- for (String key : javaProperties.stringPropertyNames()) {
- if (RemoteInterpreterUtils.isEnvString(key)) {
- env.put(key, javaProperties.getProperty(key));
- }
- if (isSparkConf(key, javaProperties.getProperty(key))) {
- sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
- }
- }
-
- setupPropertiesForPySpark(sparkProperties);
- setupPropertiesForSparkR(sparkProperties, System.getenv("SPARK_HOME"));
- if (isYarnMode() && getDeployMode().equals("cluster")) {
- env.put("SPARK_YARN_CLUSTER", "true");
- }
-
- StringBuilder sparkConfBuilder = new StringBuilder();
- if (sparkMaster != null) {
- sparkConfBuilder.append(" --master " + sparkMaster);
- }
- if (isYarnMode() && getDeployMode().equals("cluster")) {
- sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
- }
- for (String name : sparkProperties.stringPropertyNames()) {
- sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
- }
-
- env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
- LOGGER.debug("getEnvFromInterpreterProperty: " + env);
- return env;
- }
-
- private void setupPropertiesForPySpark(Properties sparkProperties) {
- if (isYarnMode()) {
- sparkProperties.setProperty("spark.yarn.isPython", "true");
- }
- }
- private void mergeSparkProperty(Properties sparkProperties, String propertyName,
- String propertyValue) {
- if (sparkProperties.containsKey(propertyName)) {
- String oldPropertyValue = sparkProperties.getProperty(propertyName);
- sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
- } else {
- sparkProperties.setProperty(propertyName, propertyValue);
- }
- }
-
- private void setupPropertiesForSparkR(Properties sparkProperties,
- String sparkHome) {
- File sparkRBasePath = null;
- if (sparkHome == null) {
- if (!getSparkMaster().startsWith("local")) {
- throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
- }
- String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
- sparkRBasePath = new File(zeppelinHome,
- "interpreter" + File.separator + "spark" + File.separator + "R");
- } else {
- sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
- }
-
- File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
- if (sparkRPath.exists() && sparkRPath.isFile()) {
- mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
- } else {
- LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
- }
- }
-
- private String getSparkMaster() {
- String master = getJavaProperties().getProperty("master");
- if (master == null) {
- master = getJavaProperties().getProperty("spark.master", "local[*]");
- }
- return master;
- }
-
- private String getDeployMode() {
- String master = getSparkMaster();
- if (master.equals("yarn-client")) {
- return "client";
- } else if (master.equals("yarn-cluster")) {
- return "cluster";
- } else if (master.startsWith("local")) {
- return "client";
- } else {
- String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
- if (deployMode == null) {
- throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
- "is not specified");
- }
- if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
- throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
- }
- return deployMode;
- }
- }
-
- private boolean isYarnMode() {
- return getSparkMaster().startsWith("yarn");
- }
-
- private String toShellFormat(String value) {
- if (value.contains("\'") && value.contains("\"")) {
- throw new RuntimeException("Spark property value could not contain both \" and '");
- } else if (value.contains("\'")) {
- return "\"" + value + "\"";
- } else {
- return "\'" + value + "\'";
+ synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
+ if (launcher == null) {
+ createLauncher();
}
+ InterpreterLaunchContext launchContext = new
+ InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);
+ RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
+ process.setRemoteInterpreterEventPoller(
+ new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
+ return process;
}
private List<Interpreter> getOrCreateSession(String user, String noteId) {
@@ -815,8 +694,7 @@ public class InterpreterSetting {
return null;
}
- private ManagedInterpreterGroup createInterpreterGroup(String groupId)
- throws InterpreterException {
+ private ManagedInterpreterGroup createInterpreterGroup(String groupId) {
AngularObjectRegistry angularObjectRegistry;
ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this);
angularObjectRegistry =
@@ -938,7 +816,8 @@ public class InterpreterSetting {
);
newProperties.put(key, property);
} else {
- throw new RuntimeException("Can not convert this type of property: " + value.getClass());
+ throw new RuntimeException("Can not convert this type of property: " +
+ value.getClass());
}
}
return newProperties;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 9dfce21..f34195d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -744,11 +744,12 @@ public class InterpreterSettingManager {
}
/**
- * Change interpreter property and restart
+ * Change interpreter properties and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
Map<String, InterpreterProperty> properties,
- List<Dependency> dependencies) throws IOException {
+ List<Dependency> dependencies)
+ throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
@@ -761,7 +762,7 @@ public class InterpreterSettingManager {
saveToFile();
} catch (Exception e) {
loadFromFile();
- throw e;
+ throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
@@ -770,7 +771,7 @@ public class InterpreterSettingManager {
}
// restart in note page
- public void restart(String settingId, String noteId, String user) {
+ public void restart(String settingId, String noteId, String user) throws InterpreterException {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
synchronized (interpreterSettings) {
@@ -794,7 +795,7 @@ public class InterpreterSettingManager {
}
}
- public void restart(String id) {
+ public void restart(String id) throws InterpreterException {
restart(id, "", "anonymous");
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
index 1d7d916..ff9cb1c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java
@@ -25,6 +25,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -52,7 +53,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
return interpreterSetting;
}
- public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
@@ -112,7 +113,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
LOGGER.info("Job " + job.getJobName() + " aborted ");
}
- interpreter.close();
+ try {
+ interpreter.close();
+ } catch (InterpreterException e) {
+ LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
+ }
//TODO(zjffdu) move the close of schedule to Interpreter
if (null != scheduler) {
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
new file mode 100644
index 0000000..f419967
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Interpreter Launcher which use shell script to launch the interpreter process.
+ *
+ */
+public class ShellScriptLauncher extends InterpreterLauncher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
+
+ public ShellScriptLauncher(ZeppelinConfiguration zConf) {
+ super(zConf);
+ }
+
+ @Override
+ public InterpreterClient launch(InterpreterLaunchContext context) {
+ LOGGER.info("Launching Interpreter: " + context.getInterpreterGroupName());
+ this.properties = context.getProperties();
+ InterpreterOption option = context.getOption();
+ InterpreterRunner runner = context.getRunner();
+ String groupName = context.getInterpreterGroupName();
+
+ int connectTimeout =
+ zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+ if (option.isExistingProcess()) {
+ return new RemoteInterpreterRunningProcess(
+ connectTimeout,
+ option.getHost(),
+ option.getPort());
+ } else {
+ // create new remote process
+ String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ + context.getInterpreterGroupId();
+ return new RemoteInterpreterManagedProcess(
+ runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+ zConf.getCallbackPortRange(),
+ zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
+ buildEnvFromProperties(), connectTimeout, groupName);
+ }
+ }
+
+ protected Map<String, String> buildEnvFromProperties() {
+ Map<String, String> env = new HashMap<>();
+ for (Object key : properties.keySet()) {
+ if (RemoteInterpreterUtils.isEnvString((String) key)) {
+ env.put((String) key, properties.getProperty((String) key));
+ }
+ }
+ return env;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
new file mode 100644
index 0000000..32a0530
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Spark specific launcher.
+ */
+public class SparkInterpreterLauncher extends ShellScriptLauncher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
+
+ public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
+ super(zConf);
+ }
+
+ @Override
+ protected Map<String, String> buildEnvFromProperties() {
+ Map<String, String> env = new HashMap<String, String>();
+ Properties sparkProperties = new Properties();
+ String sparkMaster = getSparkMaster(properties);
+ for (String key : properties.stringPropertyNames()) {
+ if (RemoteInterpreterUtils.isEnvString(key)) {
+ env.put(key, properties.getProperty(key));
+ }
+ if (isSparkConf(key, properties.getProperty(key))) {
+ sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key)));
+ }
+ }
+
+ setupPropertiesForPySpark(sparkProperties);
+ setupPropertiesForSparkR(sparkProperties);
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
+ }
+
+ StringBuilder sparkConfBuilder = new StringBuilder();
+ if (sparkMaster != null) {
+ sparkConfBuilder.append(" --master " + sparkMaster);
+ }
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties");
+ }
+ for (String name : sparkProperties.stringPropertyNames()) {
+ sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+ }
+
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+
+ // set these env in the order of
+ // 1. interpreter-setting
+ // 2. zeppelin-env.sh
+ // It is encouraged to set env in interpreter setting, but just for backward compatability,
+ // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
+ for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
+ String envValue = getEnv(envName);
+ if (envValue != null) {
+ env.put(envName, envValue);
+ }
+ }
+ LOGGER.debug("buildEnvFromProperties: " + env);
+ return env;
+
+ }
+
+
+ /**
+ * get environmental variable in the following order
+ *
+ * 1. interpreter setting
+ * 2. zeppelin-env.sh
+ *
+ */
+ private String getEnv(String envName) {
+ String env = properties.getProperty(envName);
+ if (env == null) {
+ env = System.getenv(envName);
+ }
+ return env;
+ }
+
+ private boolean isSparkConf(String key, String value) {
+ return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ }
+
+ private void setupPropertiesForPySpark(Properties sparkProperties) {
+ if (isYarnMode()) {
+ sparkProperties.setProperty("spark.yarn.isPython", "true");
+ }
+ }
+
+ private void mergeSparkProperty(Properties sparkProperties, String propertyName,
+ String propertyValue) {
+ if (sparkProperties.containsKey(propertyName)) {
+ String oldPropertyValue = sparkProperties.getProperty(propertyName);
+ sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
+ } else {
+ sparkProperties.setProperty(propertyName, propertyValue);
+ }
+ }
+
+ private void setupPropertiesForSparkR(Properties sparkProperties) {
+ String sparkHome = getEnv("SPARK_HOME");
+ File sparkRBasePath = null;
+ if (sparkHome == null) {
+ if (!getSparkMaster(properties).startsWith("local")) {
+ throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
+ " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
+ " interpreter setting");
+ }
+ String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+ sparkRBasePath = new File(zeppelinHome,
+ "interpreter" + File.separator + "spark" + File.separator + "R");
+ } else {
+ sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+ }
+
+ File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+ if (sparkRPath.exists() && sparkRPath.isFile()) {
+ mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+ } else {
+ LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+ }
+ }
+
+ /**
+ * Order to look for spark master
+ * 1. master in interpreter setting
+ * 2. spark.master interpreter setting
+ * 3. use local[*]
+ * @param properties
+ * @return
+ */
+ private String getSparkMaster(Properties properties) {
+ String master = properties.getProperty("master");
+ if (master == null) {
+ master = properties.getProperty("spark.master");
+ if (master == null) {
+ master = "local[*]";
+ }
+ }
+ return master;
+ }
+
+ private String getDeployMode() {
+ String master = getSparkMaster(properties);
+ if (master.equals("yarn-client")) {
+ return "client";
+ } else if (master.equals("yarn-cluster")) {
+ return "cluster";
+ } else if (master.startsWith("local")) {
+ return "client";
+ } else {
+ String deployMode = properties.getProperty("spark.submit.deployMode");
+ if (deployMode == null) {
+ throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
+ "is not specified");
+ }
+ if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+ throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
+ }
+ return deployMode;
+ }
+ }
+
+ private boolean isYarnMode() {
+ return getSparkMaster(properties).startsWith("yarn");
+ }
+
+ private String toShellFormat(String value) {
+ if (value.contains("\'") && value.contains("\"")) {
+ throw new RuntimeException("Spark property value could not contain both \" and '");
+ } else if (value.contains("\'")) {
+ return "\"" + value + "\"";
+ } else {
+ return "\'" + value + "\'";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
index 064abd5..7653824 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -82,7 +82,7 @@ public class InterpreterContextRunnerPool {
}
}
- throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
+ throw new RuntimeException("Can not run paragraph " + paragraphId + " on " + noteId);
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 54bf9e1..b479799 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -28,6 +28,7 @@ import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -42,6 +43,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -90,7 +92,7 @@ public class RemoteInterpreter extends Interpreter {
return this.sessionId;
}
- public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
+ public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
if (this.interpreterProcess != null) {
return this.interpreterProcess;
}
@@ -113,7 +115,7 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
- public void open() {
+ public void open() throws InterpreterException {
synchronized (this) {
if (!isOpened) {
// create all the interpreters of the same session first, then Open the internal interpreter
@@ -123,7 +125,11 @@ public class RemoteInterpreter extends Interpreter {
// also see method Interpreter.getInterpreterInTheSameSessionByClassName
for (Interpreter interpreter : getInterpreterGroup()
.getOrCreateSession(userName, sessionId)) {
- ((RemoteInterpreter) interpreter).internal_create();
+ try {
+ ((RemoteInterpreter) interpreter).internal_create();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@@ -147,7 +153,7 @@ public class RemoteInterpreter extends Interpreter {
}
}
- private void internal_create() {
+ private void internal_create() throws IOException {
synchronized (this) {
if (!isCreated) {
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
@@ -156,7 +162,7 @@ public class RemoteInterpreter extends Interpreter {
public Void call(Client client) throws Exception {
LOGGER.info("Create RemoteInterpreter {}", getClassName());
client.createInterpreter(getInterpreterGroup().getId(), sessionId,
- className, (Map) property, userName);
+ className, (Map) getProperties(), userName);
return null;
}
});
@@ -167,9 +173,14 @@ public class RemoteInterpreter extends Interpreter {
@Override
- public void close() {
+ public void close() throws InterpreterException {
if (isOpened) {
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@@ -184,13 +195,19 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
- public InterpreterResult interpret(final String st, final InterpreterContext context) {
+ public InterpreterResult interpret(final String st, final InterpreterContext context)
+ throws InterpreterException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("st:\n{}", st);
}
final FormType form = getFormType();
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
.getInterpreterContextRunnerPool();
List<InterpreterContextRunner> runners = context.getRunners();
@@ -238,12 +255,17 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
- public void cancel(final InterpreterContext context) {
+ public void cancel(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
return;
}
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@@ -254,7 +276,7 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
- public FormType getFormType() {
+ public FormType getFormType() throws InterpreterException {
if (formType != null) {
return formType;
}
@@ -265,7 +287,12 @@ public class RemoteInterpreter extends Interpreter {
open();
}
}
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
FormType type = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
@Override
@@ -277,13 +304,19 @@ public class RemoteInterpreter extends Interpreter {
return type;
}
+
@Override
- public int getProgress(final InterpreterContext context) {
+ public int getProgress(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
return 0;
}
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
@Override
@@ -296,12 +329,18 @@ public class RemoteInterpreter extends Interpreter {
@Override
public List<InterpreterCompletion> completion(final String buf, final int cursor,
- final InterpreterContext interpreterContext) {
+ final InterpreterContext interpreterContext)
+ throws InterpreterException {
if (!isOpened) {
LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
return new ArrayList<>();
}
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
@Override
@@ -317,7 +356,12 @@ public class RemoteInterpreter extends Interpreter {
LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
return Job.Status.UNKNOWN.name();
}
- RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
+ RemoteInterpreterProcess interpreterProcess = null;
+ try {
+ interpreterProcess = getOrCreateInterpreterProcess();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<String>() {
@Override
@@ -331,7 +375,7 @@ public class RemoteInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
int maxConcurrency = Integer.parseInt(
- property.getProperty("zeppelin.interpreter.max.poolsize",
+ getProperty("zeppelin.interpreter.max.poolsize",
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
Scheduler s = new RemoteScheduler(
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index d21a962..6e26e58 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
@@ -73,11 +74,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
String localRepoDir,
Map<String, String> env,
int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
String interpreterGroupName) {
- super(new RemoteInterpreterEventPoller(listener, appListener),
- connectTimeout);
+ super(connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = portRange;
this.env = env;
@@ -86,23 +84,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
this.interpreterGroupName = interpreterGroupName;
}
- RemoteInterpreterManagedProcess(String intpRunner,
- String intpDir,
- String localRepoDir,
- Map<String, String> env,
- RemoteInterpreterEventPoller remoteInterpreterEventPoller,
- int connectTimeout,
- String interpreterGroupName) {
- super(remoteInterpreterEventPoller,
- connectTimeout);
- this.interpreterRunner = intpRunner;
- this.portRange = ":";
- this.env = env;
- this.interpreterDir = intpDir;
- this.localRepoDir = localRepoDir;
- this.interpreterGroupName = interpreterGroupName;
- }
-
@Override
public String getHost() {
return "localhost";
@@ -124,7 +105,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
- throw new InterpreterException(e1);
+ throw new RuntimeException(e1);
}
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
@@ -206,7 +187,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
running.set(false);
- throw new InterpreterException(e);
+ throw new RuntimeException(e);
}
try {
@@ -217,7 +198,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
if (!running.get()) {
callbackServer.stop();
- throw new InterpreterException("Cannot run interpreter");
+ throw new RuntimeException(new String(cmdOut.toByteArray()));
}
} catch (InterruptedException e) {
logger.error("Remote interpreter is not accessible");
@@ -227,7 +208,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public void stop() {
// shutdown EventPoller first.
- this.remoteInterpreterEventPoller.shutdown();
+ this.getRemoteInterpreterEventPoller().shutdown();
if (callbackServer.isServing()) {
callbackServer.stop();
}
@@ -266,6 +247,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
running.set(false);
}
+ @VisibleForTesting
+ public Map<String, String> getEnv() {
+ return env;
+ }
+
+ @VisibleForTesting
+ public String getLocalRepoDir() {
+ return localRepoDir;
+ }
+
+ @VisibleForTesting
+ public String getInterpreterDir() {
+ return interpreterDir;
+ }
+
+ @VisibleForTesting
+ public String getInterpreterGroupName() {
+ return interpreterGroupName;
+ }
+
+ @VisibleForTesting
+ public String getInterpreterRunner() {
+ return interpreterRunner;
+ }
+
public boolean isRunning() {
return running.get();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index e45f15b..88cc489 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -21,6 +21,7 @@ import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,27 +29,17 @@ import org.slf4j.LoggerFactory;
/**
* Abstract class for interpreter process
*/
-public abstract class RemoteInterpreterProcess {
+public abstract class RemoteInterpreterProcess implements InterpreterClient {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private GenericObjectPool<Client> clientPool;
- protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+ private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
public RemoteInterpreterProcess(
- int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener) {
- this(new RemoteInterpreterEventPoller(listener, appListener),
- connectTimeout);
- this.remoteInterpreterEventPoller.setInterpreterProcess(this);
- }
-
- RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
- int connectTimeout) {
+ int connectTimeout) {
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
- this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
this.connectTimeout = connectTimeout;
}
@@ -56,6 +47,10 @@ public abstract class RemoteInterpreterProcess {
return remoteInterpreterEventPoller;
}
+ public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPoller) {
+ this.remoteInterpreterEventPoller = eventPoller;
+ }
+
public abstract String getHost();
public abstract int getPort();
public abstract void start(String userName, Boolean isUserImpersonate);
@@ -147,9 +142,9 @@ public abstract class RemoteInterpreterProcess {
}
} catch (TException e) {
broken = true;
- throw new InterpreterException(e);
+ throw new RuntimeException(e);
} catch (Exception e1) {
- throw new InterpreterException(e1);
+ throw new RuntimeException(e1);
} finally {
if (client != null) {
releaseClient(client, broken);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
index bb176be..d8715a0 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java
@@ -30,12 +30,10 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
public RemoteInterpreterRunningProcess(
int connectTimeout,
- RemoteInterpreterProcessListener listener,
- ApplicationEventListener appListener,
String host,
int port
) {
- super(connectTimeout, listener, appListener);
+ super(connectTimeout);
this.host = host;
this.port = port;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 03c5046..b5dda67 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -627,7 +627,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
if (intp == null) {
String intpExceptionMsg =
p.getJobName() + "'s Interpreter " + requiredReplName + " not found";
- InterpreterException intpException = new InterpreterException(intpExceptionMsg);
+ RuntimeException intpException = new RuntimeException(intpExceptionMsg);
InterpreterResult intpResult =
new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage());
p.setReturn(intpResult, intpException);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 4652fcd..77fd04c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -892,7 +892,11 @@ public class Notebook implements NoteEventListener {
if (releaseResource) {
for (InterpreterSetting setting : notebook.getInterpreterSettingManager()
.getInterpreterSettings(note.getId())) {
- notebook.getInterpreterSettingManager().restart(setting.getId());
+ try {
+ notebook.getInterpreterSettingManager().restart(setting.getId());
+ } catch (InterpreterException e) {
+ logger.error("Fail to restart interpreter: " + setting.getId(), e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index 68ce794..701943a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -312,15 +312,14 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
String replName = getRequiredReplName(trimmedBuffer);
String body = getScriptBody(trimmedBuffer);
- Interpreter repl = getRepl(replName);
- if (repl == null) {
- return null;
- }
-
InterpreterContext interpreterContext = getInterpreterContextWithoutRunner(null);
- List completion = repl.completion(body, cursor, interpreterContext);
- return completion;
+ try {
+ Interpreter repl = getRepl(replName);
+ return repl.completion(body, cursor, interpreterContext);
+ } catch (InterpreterException e) {
+ throw new RuntimeException("Fail to get completion", e);
+ }
}
public int calculateCursorPosition(String buffer, String trimmedBuffer, int cursor) {
@@ -362,11 +361,15 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
@Override
public int progress() {
String replName = getRequiredReplName();
- Interpreter repl = getRepl(replName);
- if (repl != null) {
+
+ try {
+ Interpreter repl = getRepl(replName);
+ if (repl == null) {
+ return 0;
+ }
return repl.getProgress(getInterpreterContext(null));
- } else {
- return 0;
+ } catch (InterpreterException e) {
+ throw new RuntimeException("Fail to get progress", e);
}
}
@@ -494,10 +497,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
protected boolean jobAbort() {
Interpreter repl = getRepl(getRequiredReplName());
if (repl == null) {
- // when interpreters are already destroyed
return true;
}
-
Scheduler scheduler = repl.getScheduler();
if (scheduler == null) {
return true;
@@ -507,7 +508,11 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (job != null) {
job.setStatus(Status.ABORT);
} else {
- repl.cancel(getInterpreterContextWithoutRunner(null));
+ try {
+ repl.cancel(getInterpreterContextWithoutRunner(null));
+ } catch (InterpreterException e) {
+ throw new RuntimeException(e);
+ }
}
return true;
}
@@ -738,12 +743,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
private boolean isValidInterpreter(String replName) {
- try {
- return factory.getInterpreter(user, note.getId(), replName) != null;
- } catch (InterpreterException e) {
- // ignore this exception, it would be recaught when running paragraph.
- return false;
- }
+ return factory.getInterpreter(user, note.getId(), replName) != null;
}
public void updateRuntimeInfos(String label, String tooltip, Map<String, String> infos,
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9812e26b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
index 8f3e615..f23d433 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.helium;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.ApplicationState;
@@ -241,7 +242,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
@Test
- public void testUnloadOnInterpreterRestart() throws IOException {
+ public void testUnloadOnInterpreterRestart() throws IOException, InterpreterException {
// given
HeliumPackage pkg1 = new HeliumPackage(HeliumType.APPLICATION,
"name1",