You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/06/29 00:54:07 UTC
zeppelin git commit: ZEPPELIN-3218. Plugins for Interpreter Launcher
Repository: zeppelin
Updated Branches:
refs/heads/master 1a3d1d185 -> 93a9aefc9
ZEPPELIN-3218. Plugins for Interpreter Launcher
### What is this PR for?
Move launcher into zeppelin plugins
### What type of PR is it?
[Refactoring]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3218
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3043 from zjffdu/ZEPPELIN-3218 and squashes the following commits:
45ee5844a [Jeff Zhang] ZEPPELIN-3218. Plugins for Interpreter Launcher
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/93a9aefc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/93a9aefc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/93a9aefc
Branch: refs/heads/master
Commit: 93a9aefc9d7cfcfd01a7b308d27695460ec847f4
Parents: 1a3d1d1
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Jun 25 15:40:41 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Fri Jun 29 08:53:57 2018 +0800
----------------------------------------------------------------------
zeppelin-plugins/launcher/spark/pom.xml | 58 +++++
.../launcher/SparkInterpreterLauncher.java | 226 +++++++++++++++++++
.../launcher/SparkInterpreterLauncherTest.java | 192 ++++++++++++++++
zeppelin-plugins/launcher/standard/pom.xml | 50 ++++
.../launcher/StandardInterpreterLauncher.java | 100 ++++++++
.../StandardInterpreterLauncherTest.java | 86 +++++++
zeppelin-plugins/pom.xml | 3 +
.../zeppelin/rest/AbstractTestRestApi.java | 2 +
.../interpreter/InterpreterSetting.java | 20 +-
.../launcher/ShellScriptLauncher.java | 100 --------
.../launcher/SparkInterpreterLauncher.java | 226 -------------------
.../apache/zeppelin/plugin/PluginManager.java | 86 +++++--
.../launcher/ShellScriptLauncherTest.java | 86 -------
.../launcher/SparkInterpreterLauncherTest.java | 192 ----------------
14 files changed, 795 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/spark/pom.xml b/zeppelin-plugins/launcher/spark/pom.xml
new file mode 100644
index 0000000..88384eb
--- /dev/null
+++ b/zeppelin-plugins/launcher/spark/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zengine-plugins-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../../../zeppelin-plugins</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>launcher-spark</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Plugin SparkInterpreterLauncher</name>
+ <description>Spark Launcher implementation based on shell script interpreter.sh</description>
+
+ <properties>
+ <plugin.name>Launcher/SparkInterpreterLauncher</plugin.name>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>launcher-standard</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
new file mode 100644
index 0000000..ab95e0b
--- /dev/null
+++ b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Spark specific launcher.
+ */
+public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
+
+ public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
+ super(zConf, recoveryStorage);
+ }
+
+ @Override
+ protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
+ Map<String, String> env = new HashMap<String, String>();
+ Properties sparkProperties = new Properties();
+ String sparkMaster = getSparkMaster(properties);
+ for (String key : properties.stringPropertyNames()) {
+ if (RemoteInterpreterUtils.isEnvString(key)) {
+ env.put(key, properties.getProperty(key));
+ }
+ if (isSparkConf(key, properties.getProperty(key))) {
+ sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key)));
+ }
+ }
+
+ setupPropertiesForPySpark(sparkProperties);
+ setupPropertiesForSparkR(sparkProperties);
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
+ sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
+ }
+
+ StringBuilder sparkConfBuilder = new StringBuilder();
+ if (sparkMaster != null) {
+ sparkConfBuilder.append(" --master " + sparkMaster);
+ }
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties");
+ }
+ for (String name : sparkProperties.stringPropertyNames()) {
+ sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+ }
+ String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
+ if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
+ !useProxyUserEnv.equals("false"))) {
+ sparkConfBuilder.append(" --proxy-user " + context.getUserName());
+ }
+
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+
+ // set these env in the order of
+ // 1. interpreter-setting
+ // 2. zeppelin-env.sh
+ // It is encouraged to set env in interpreter setting, but just for backward compatability,
+ // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
+ for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
+ String envValue = getEnv(envName);
+ if (envValue != null) {
+ env.put(envName, envValue);
+ }
+ }
+
+ String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
+ String principal =
+ zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
+
+ if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
+ env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
+ env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
+ LOGGER.info("Run Spark under secure mode with keytab: " + keytab +
+ ", principal: " + principal);
+ } else {
+ LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified");
+ }
+ LOGGER.debug("buildEnvFromProperties: " + env);
+ return env;
+
+ }
+
+
+ /**
+ * get environmental variable in the following order
+ *
+ * 1. interpreter setting
+ * 2. zeppelin-env.sh
+ *
+ */
+ private String getEnv(String envName) {
+ String env = properties.getProperty(envName);
+ if (env == null) {
+ env = System.getenv(envName);
+ }
+ return env;
+ }
+
+ private boolean isSparkConf(String key, String value) {
+ return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ }
+
+ private void setupPropertiesForPySpark(Properties sparkProperties) {
+ if (isYarnMode()) {
+ sparkProperties.setProperty("spark.yarn.isPython", "true");
+ }
+ }
+
+ private void mergeSparkProperty(Properties sparkProperties, String propertyName,
+ String propertyValue) {
+ if (sparkProperties.containsKey(propertyName)) {
+ String oldPropertyValue = sparkProperties.getProperty(propertyName);
+ sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
+ } else {
+ sparkProperties.setProperty(propertyName, propertyValue);
+ }
+ }
+
+ private void setupPropertiesForSparkR(Properties sparkProperties) {
+ String sparkHome = getEnv("SPARK_HOME");
+ File sparkRBasePath = null;
+ if (sparkHome == null) {
+ if (!getSparkMaster(properties).startsWith("local")) {
+ throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
+ " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
+ " interpreter setting");
+ }
+ String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+ sparkRBasePath = new File(zeppelinHome,
+ "interpreter" + File.separator + "spark" + File.separator + "R");
+ } else {
+ sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+ }
+
+ File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+ if (sparkRPath.exists() && sparkRPath.isFile()) {
+ mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
+ sparkRPath.getAbsolutePath() + "#sparkr");
+ } else {
+ LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+ }
+ }
+
+ /**
+ * Order to look for spark master
+ * 1. master in interpreter setting
+ * 2. spark.master interpreter setting
+ * 3. use local[*]
+ * @param properties
+ * @return
+ */
+ private String getSparkMaster(Properties properties) {
+ String master = properties.getProperty("master");
+ if (master == null) {
+ master = properties.getProperty("spark.master");
+ if (master == null) {
+ master = "local[*]";
+ }
+ }
+ return master;
+ }
+
+ private String getDeployMode() {
+ String master = getSparkMaster(properties);
+ if (master.equals("yarn-client")) {
+ return "client";
+ } else if (master.equals("yarn-cluster")) {
+ return "cluster";
+ } else if (master.startsWith("local")) {
+ return "client";
+ } else {
+ String deployMode = properties.getProperty("spark.submit.deployMode");
+ if (deployMode == null) {
+ throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
+ "is not specified");
+ }
+ if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+ throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
+ }
+ return deployMode;
+ }
+ }
+
+ private boolean isYarnMode() {
+ return getSparkMaster(properties).startsWith("yarn");
+ }
+
+ private String toShellFormat(String value) {
+ if (value.contains("'") && value.contains("\"")) {
+ throw new RuntimeException("Spark property value could not contain both \" and '");
+ } else if (value.contains("'")) {
+ return "\"" + value + "\"";
+ } else {
+ return "'" + value + "'";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
new file mode 100644
index 0000000..c2abd60
--- /dev/null
+++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkInterpreterLauncherTest {
+ @Before
+ public void setUp() {
+ for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
+ System.clearProperty(confVar.getVarName());
+ }
+ }
+
+ @Test
+ public void testConnectTimeOut() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty(
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("name", interpreterProcess.getInterpreterSettingName());
+ assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
+ assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+ assertEquals(10000, interpreterProcess.getConnectTimeout());
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 2);
+ assertEquals(true, interpreterProcess.isUserImpersonated());
+ }
+
+ @Test
+ public void testLocalMode() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty("property_1", "value_1");
+ properties.setProperty("master", "local[*]");
+ properties.setProperty("spark.files", "file_1");
+ properties.setProperty("spark.jars", "jar_1");
+
+ InterpreterOption option = new InterpreterOption();
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+ assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+ assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 2);
+ assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+ assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+ }
+
+ @Test
+ public void testYarnClientMode_1() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty("property_1", "value_1");
+ properties.setProperty("master", "yarn-client");
+ properties.setProperty("spark.files", "file_1");
+ properties.setProperty("spark.jars", "jar_1");
+
+ InterpreterOption option = new InterpreterOption();
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+ assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+ assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 2);
+ assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+ assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+ }
+
+ @Test
+ public void testYarnClientMode_2() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty("property_1", "value_1");
+ properties.setProperty("master", "yarn");
+ properties.setProperty("spark.submit.deployMode", "client");
+ properties.setProperty("spark.files", "file_1");
+ properties.setProperty("spark.jars", "jar_1");
+
+ InterpreterOption option = new InterpreterOption();
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+ assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+ assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 2);
+ assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+ assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+ }
+
+ @Test
+ public void testYarnClusterMode_1() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty("property_1", "value_1");
+ properties.setProperty("master", "yarn-cluster");
+ properties.setProperty("spark.files", "file_1");
+ properties.setProperty("spark.jars", "jar_1");
+
+ InterpreterOption option = new InterpreterOption();
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+ assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+ assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 3);
+ assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+ assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+ assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+ }
+
+ @Test
+ public void testYarnClusterMode_2() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("SPARK_HOME", "/user/spark");
+ properties.setProperty("property_1", "value_1");
+ properties.setProperty("master", "yarn");
+ properties.setProperty("spark.submit.deployMode", "cluster");
+ properties.setProperty("spark.files", "file_1");
+ properties.setProperty("spark.jars", "jar_1");
+
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+ assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+ assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertTrue(interpreterProcess.getEnv().size() >= 3);
+ assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+ assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+ assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/pom.xml b/zeppelin-plugins/launcher/standard/pom.xml
new file mode 100644
index 0000000..da961e9
--- /dev/null
+++ b/zeppelin-plugins/launcher/standard/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>zengine-plugins-parent</artifactId>
+ <groupId>org.apache.zeppelin</groupId>
+ <version>0.9.0-SNAPSHOT</version>
+ <relativePath>../../../zeppelin-plugins</relativePath>
+ </parent>
+
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>launcher-standard</artifactId>
+ <packaging>jar</packaging>
+ <version>0.9.0-SNAPSHOT</version>
+ <name>Zeppelin: Plugin StandardLauncher</name>
+ <description>Launcher implementation based on shell script interpreter.sh</description>
+
+ <properties>
+ <plugin.name>Launcher/StandardInterpreterLauncher</plugin.name>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
new file mode 100644
index 0000000..10ab354
--- /dev/null
+++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Interpreter Launcher which use shell script to launch the interpreter process.
+ */
+public class StandardInterpreterLauncher extends InterpreterLauncher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardInterpreterLauncher.class);
+
+ public StandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
+ super(zConf, recoveryStorage);
+ }
+
+ @Override
+ public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
+ LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+ this.properties = context.getProperties();
+ InterpreterOption option = context.getOption();
+ InterpreterRunner runner = context.getRunner();
+ String groupName = context.getInterpreterSettingGroup();
+ String name = context.getInterpreterSettingName();
+ int connectTimeout = getConnectTimeout();
+
+ if (option.isExistingProcess()) {
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ option.getHost(),
+ option.getPort());
+ } else {
+ // try to recover it first
+ if (zConf.isRecoveryEnabled()) {
+ InterpreterClient recoveredClient =
+ recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
+ if (recoveredClient != null) {
+ if (recoveredClient.isRunning()) {
+ LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
+ recoveredClient.getPort());
+ return recoveredClient;
+ } else {
+ LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
+ + recoveredClient.getPort() + ", as it is already terminated.");
+ }
+ }
+ }
+
+ // create new remote process
+ String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ + context.getInterpreterSettingId();
+ return new RemoteInterpreterManagedProcess(
+ runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+ context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(),
+ zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
+ buildEnvFromProperties(context), connectTimeout, name,
+ context.getInterpreterGroupId(), option.isUserImpersonate());
+ }
+ }
+
+ protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
+ Map<String, String> env = new HashMap<>();
+ for (Object key : context.getProperties().keySet()) {
+ if (RemoteInterpreterUtils.isEnvString((String) key)) {
+ env.put((String) key, context.getProperties().getProperty((String) key));
+ }
+ }
+ return env;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
new file mode 100644
index 0000000..1861636
--- /dev/null
+++ b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StandardInterpreterLauncherTest {
+ @Before
+ public void setUp() {
+ for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
+ System.clearProperty(confVar.getVarName());
+ }
+ }
+
+ @Test
+ public void testLauncher() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("ENV_1", "VALUE_1");
+ properties.setProperty("property_1", "value_1");
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("name", interpreterProcess.getInterpreterSettingName());
+ assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
+ assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+ assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
+ interpreterProcess.getConnectTimeout());
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertEquals(1, interpreterProcess.getEnv().size());
+ assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
+ assertEquals(true, interpreterProcess.isUserImpersonated());
+ }
+
+ @Test
+ public void testConnectTimeOut() throws IOException {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty(
+ ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
+ InterpreterOption option = new InterpreterOption();
+ option.setUserImpersonate(true);
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof RemoteInterpreterManagedProcess);
+ RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
+ assertEquals("name", interpreterProcess.getInterpreterSettingName());
+ assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
+ assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
+ assertEquals(10000, interpreterProcess.getConnectTimeout());
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertEquals(0, interpreterProcess.getEnv().size());
+ assertEquals(true, interpreterProcess.isUserImpersonated());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-plugins/pom.xml b/zeppelin-plugins/pom.xml
index abbb602..f67ac9d 100644
--- a/zeppelin-plugins/pom.xml
+++ b/zeppelin-plugins/pom.xml
@@ -46,6 +46,9 @@
<module>notebookrepo/mongodb</module>
<module>notebookrepo/zeppelin-hub</module>
<module>notebookrepo/filesystem</module>
+
+ <module>launcher/standard</module>
+ <module>launcher/spark</module>
</modules>
<dependencies>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 172f117..80165ff 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -35,6 +35,7 @@ import org.apache.commons.httpclient.methods.PutMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.plugin.PluginManager;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
@@ -292,6 +293,7 @@ public abstract class AbstractTestRestApi {
LOG.info("Terminating test Zeppelin...");
ZeppelinServer.jettyWebServer.stop();
executor.shutdown();
+ PluginManager.reset();
long s = System.currentTimeMillis();
boolean started = true;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index b5596e2..3278e2f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -26,7 +26,6 @@ import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
import com.google.gson.internal.StringMap;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
@@ -35,8 +34,6 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
-import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
-import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -44,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.plugin.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -293,12 +291,9 @@ public class InterpreterSetting {
this.conf = o.getConf();
}
- private void createLauncher() {
- if (group.equals("spark")) {
- this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
- } else {
- this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
- }
+ private void createLauncher() throws IOException {
+ this.launcher = PluginManager.get().loadInterpreterLauncher(
+ getLauncherPlugin(), recoveryStorage);
}
public AngularObjectRegistryListener getAngularObjectRegistryListener() {
@@ -665,6 +660,13 @@ public class InterpreterSetting {
runtimeInfosToBeCleared = null;
}
+ public String getLauncherPlugin() {
+ if (group.equals("spark")) {
+ return "SparkInterpreterLauncher";
+ } else {
+ return "StandardInterpreterLauncher";
+ }
+ }
//////////////////////////// IMPORTANT ////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
deleted file mode 100644
index 18a6dde..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterRunner;
-import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Interpreter Launcher which use shell script to launch the interpreter process.
- */
-public class ShellScriptLauncher extends InterpreterLauncher {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
-
- public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
- super(zConf, recoveryStorage);
- }
-
- @Override
- public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
- LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
- this.properties = context.getProperties();
- InterpreterOption option = context.getOption();
- InterpreterRunner runner = context.getRunner();
- String groupName = context.getInterpreterSettingGroup();
- String name = context.getInterpreterSettingName();
- int connectTimeout = getConnectTimeout();
-
- if (option.isExistingProcess()) {
- return new RemoteInterpreterRunningProcess(
- context.getInterpreterSettingName(),
- connectTimeout,
- option.getHost(),
- option.getPort());
- } else {
- // try to recover it first
- if (zConf.isRecoveryEnabled()) {
- InterpreterClient recoveredClient =
- recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
- if (recoveredClient != null) {
- if (recoveredClient.isRunning()) {
- LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
- recoveredClient.getPort());
- return recoveredClient;
- } else {
- LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
- + recoveredClient.getPort() + ", as it is already terminated.");
- }
- }
- }
-
- // create new remote process
- String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
- + context.getInterpreterSettingId();
- return new RemoteInterpreterManagedProcess(
- runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
- context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(),
- zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
- buildEnvFromProperties(context), connectTimeout, name,
- context.getInterpreterGroupId(), option.isUserImpersonate());
- }
- }
-
- protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
- Map<String, String> env = new HashMap<>();
- for (Object key : context.getProperties().keySet()) {
- if (RemoteInterpreterUtils.isEnvString((String) key)) {
- env.put((String) key, context.getProperties().getProperty((String) key));
- }
- }
- return env;
- }
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
deleted file mode 100644
index ff65e0d..0000000
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Spark specific launcher.
- */
-public class SparkInterpreterLauncher extends ShellScriptLauncher {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
-
- public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
- super(zConf, recoveryStorage);
- }
-
- @Override
- protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) {
- Map<String, String> env = new HashMap<String, String>();
- Properties sparkProperties = new Properties();
- String sparkMaster = getSparkMaster(properties);
- for (String key : properties.stringPropertyNames()) {
- if (RemoteInterpreterUtils.isEnvString(key)) {
- env.put(key, properties.getProperty(key));
- }
- if (isSparkConf(key, properties.getProperty(key))) {
- sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key)));
- }
- }
-
- setupPropertiesForPySpark(sparkProperties);
- setupPropertiesForSparkR(sparkProperties);
- if (isYarnMode() && getDeployMode().equals("cluster")) {
- env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
- sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
- }
-
- StringBuilder sparkConfBuilder = new StringBuilder();
- if (sparkMaster != null) {
- sparkConfBuilder.append(" --master " + sparkMaster);
- }
- if (isYarnMode() && getDeployMode().equals("cluster")) {
- sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties");
- }
- for (String name : sparkProperties.stringPropertyNames()) {
- sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
- }
- String useProxyUserEnv = System.getenv("ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER");
- if (context.getOption().isUserImpersonate() && (StringUtils.isBlank(useProxyUserEnv) ||
- !useProxyUserEnv.equals("false"))) {
- sparkConfBuilder.append(" --proxy-user " + context.getUserName());
- }
-
- env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
-
- // set these env in the order of
- // 1. interpreter-setting
- // 2. zeppelin-env.sh
- // It is encouraged to set env in interpreter setting, but just for backward compatability,
- // we also fallback to zeppelin-env.sh if it is not specified in interpreter setting.
- for (String envName : new String[]{"SPARK_HOME", "SPARK_CONF_DIR", "HADOOP_CONF_DIR"}) {
- String envValue = getEnv(envName);
- if (envValue != null) {
- env.put(envName, envValue);
- }
- }
-
- String keytab = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
- String principal =
- zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
-
- if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) {
- env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab);
- env.put("ZEPPELIN_SERVER_KERBEROS_PRINCIPAL", principal);
- LOGGER.info("Run Spark under secure mode with keytab: " + keytab +
- ", principal: " + principal);
- } else {
- LOGGER.info("Run Spark under non-secure mode as no keytab and principal is specified");
- }
- LOGGER.debug("buildEnvFromProperties: " + env);
- return env;
-
- }
-
-
- /**
- * get environmental variable in the following order
- *
- * 1. interpreter setting
- * 2. zeppelin-env.sh
- *
- */
- private String getEnv(String envName) {
- String env = properties.getProperty(envName);
- if (env == null) {
- env = System.getenv(envName);
- }
- return env;
- }
-
- private boolean isSparkConf(String key, String value) {
- return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
- }
-
- private void setupPropertiesForPySpark(Properties sparkProperties) {
- if (isYarnMode()) {
- sparkProperties.setProperty("spark.yarn.isPython", "true");
- }
- }
-
- private void mergeSparkProperty(Properties sparkProperties, String propertyName,
- String propertyValue) {
- if (sparkProperties.containsKey(propertyName)) {
- String oldPropertyValue = sparkProperties.getProperty(propertyName);
- sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
- } else {
- sparkProperties.setProperty(propertyName, propertyValue);
- }
- }
-
- private void setupPropertiesForSparkR(Properties sparkProperties) {
- String sparkHome = getEnv("SPARK_HOME");
- File sparkRBasePath = null;
- if (sparkHome == null) {
- if (!getSparkMaster(properties).startsWith("local")) {
- throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
- " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
- " interpreter setting");
- }
- String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
- sparkRBasePath = new File(zeppelinHome,
- "interpreter" + File.separator + "spark" + File.separator + "R");
- } else {
- sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
- }
-
- File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
- if (sparkRPath.exists() && sparkRPath.isFile()) {
- mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives",
- sparkRPath.getAbsolutePath() + "#sparkr");
- } else {
- LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
- }
- }
-
- /**
- * Order to look for spark master
- * 1. master in interpreter setting
- * 2. spark.master interpreter setting
- * 3. use local[*]
- * @param properties
- * @return
- */
- private String getSparkMaster(Properties properties) {
- String master = properties.getProperty("master");
- if (master == null) {
- master = properties.getProperty("spark.master");
- if (master == null) {
- master = "local[*]";
- }
- }
- return master;
- }
-
- private String getDeployMode() {
- String master = getSparkMaster(properties);
- if (master.equals("yarn-client")) {
- return "client";
- } else if (master.equals("yarn-cluster")) {
- return "cluster";
- } else if (master.startsWith("local")) {
- return "client";
- } else {
- String deployMode = properties.getProperty("spark.submit.deployMode");
- if (deployMode == null) {
- throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
- "is not specified");
- }
- if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
- throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
- }
- return deployMode;
- }
- }
-
- private boolean isYarnMode() {
- return getSparkMaster(properties).startsWith("yarn");
- }
-
- private String toShellFormat(String value) {
- if (value.contains("'") && value.contains("\"")) {
- throw new RuntimeException("Spark property value could not contain both \" and '");
- } else if (value.contains("'")) {
- return "\"" + value + "\"";
- } else {
- return "'" + value + "'";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
index f573b15..5f7dc1d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java
@@ -17,19 +17,24 @@
package org.apache.zeppelin.plugin;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
-import java.util.ServiceLoader;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Class for loading Plugins
@@ -42,6 +47,8 @@ public class PluginManager {
private ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
private String pluginsDir = zConf.getPluginsDir();
+ private Map<String, InterpreterLauncher> cachedLaunchers = new HashMap<>();
+
public static synchronized PluginManager get() {
if (instance == null) {
instance = new PluginManager();
@@ -65,27 +72,13 @@ public class PluginManager {
}
String simpleClassName = notebookRepoClassName.substring(notebookRepoClassName.lastIndexOf(".") + 1);
- File pluginFolder = new File(pluginsDir + "/NotebookRepo/" + simpleClassName);
- if (!pluginFolder.exists() || pluginFolder.isFile()) {
- LOGGER.warn("pluginFolder " + pluginFolder.getAbsolutePath() +
- " doesn't exist or is not a directory");
- return null;
- }
- List<URL> urls = new ArrayList<>();
- for (File file : pluginFolder.listFiles()) {
- LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of plugin "
- + notebookRepoClassName);
- urls.add(file.toURI().toURL());
- }
- if (urls.isEmpty()) {
- LOGGER.warn("Can not load plugin " + notebookRepoClassName +
- ", because the plugin folder " + pluginFolder + " is empty.");
+ URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, "NotebookRepo", simpleClassName);
+ if (pluginClassLoader == null) {
return null;
}
- URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[0]));
NotebookRepo notebookRepo = null;
try {
- notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, true, classLoader)).newInstance();
+ notebookRepo = (NotebookRepo) (Class.forName(notebookRepoClassName, true, pluginClassLoader)).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
LOGGER.warn("Fail to instantiate notebookrepo from plugin classpath:" + notebookRepoClassName, e);
}
@@ -96,4 +89,59 @@ public class PluginManager {
return notebookRepo;
}
+ public synchronized InterpreterLauncher loadInterpreterLauncher(String launcherPlugin,
+ RecoveryStorage recoveryStorage)
+ throws IOException {
+
+ if (cachedLaunchers.containsKey(launcherPlugin)) {
+ return cachedLaunchers.get(launcherPlugin);
+ }
+ LOGGER.info("Loading Interpreter Launcher Plugin: " + launcherPlugin);
+ URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, "Launcher", launcherPlugin);
+ String pluginClass = "org.apache.zeppelin.interpreter.launcher." + launcherPlugin;
+ InterpreterLauncher launcher = null;
+ try {
+ launcher = (InterpreterLauncher) (Class.forName(pluginClass, true, pluginClassLoader))
+ .getConstructor(ZeppelinConfiguration.class, RecoveryStorage.class)
+ .newInstance(zConf, recoveryStorage);
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException
+ | NoSuchMethodException | InvocationTargetException e) {
+ LOGGER.warn("Fail to instantiate Launcher from plugin classpath:" + launcherPlugin, e);
+ }
+
+ if (launcher == null) {
+ throw new IOException("Fail to load plugin: " + launcherPlugin);
+ }
+ cachedLaunchers.put(launcherPlugin, launcher);
+ return launcher;
+ }
+
+ private URLClassLoader getPluginClassLoader(String pluginsDir,
+ String pluginType,
+ String pluginName) throws IOException {
+
+ File pluginFolder = new File(pluginsDir + "/" + pluginType + "/" + pluginName);
+ if (!pluginFolder.exists() || pluginFolder.isFile()) {
+ LOGGER.warn("PluginFolder " + pluginFolder.getAbsolutePath() +
+ " doesn't exist or is not a directory");
+ return null;
+ }
+ List<URL> urls = new ArrayList<>();
+ for (File file : pluginFolder.listFiles()) {
+ LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of plugin: "
+ + pluginName);
+ urls.add(file.toURI().toURL());
+ }
+ if (urls.isEmpty()) {
+ LOGGER.warn("Can not load plugin " + pluginName +
+ ", because the plugin folder " + pluginFolder + " is empty.");
+ return null;
+ }
+ return new URLClassLoader(urls.toArray(new URL[0]));
+ }
+
+ @VisibleForTesting
+ public static void reset() {
+ instance = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
deleted file mode 100644
index ace3f31..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ShellScriptLauncherTest {
- @Before
- public void setUp() {
- for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
- System.clearProperty(confVar.getVarName());
- }
- }
-
- @Test
- public void testLauncher() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("ENV_1", "VALUE_1");
- properties.setProperty("property_1", "value_1");
- InterpreterOption option = new InterpreterOption();
- option.setUserImpersonate(true);
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("name", interpreterProcess.getInterpreterSettingName());
- assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
- assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
- assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(),
- interpreterProcess.getConnectTimeout());
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertEquals(1, interpreterProcess.getEnv().size());
- assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
- assertEquals(true, interpreterProcess.isUserImpersonated());
- }
-
- @Test
- public void testConnectTimeOut() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty(
- ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
- InterpreterOption option = new InterpreterOption();
- option.setUserImpersonate(true);
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("name", interpreterProcess.getInterpreterSettingName());
- assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
- assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
- assertEquals(10000, interpreterProcess.getConnectTimeout());
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertEquals(0, interpreterProcess.getEnv().size());
- assertEquals(true, interpreterProcess.isUserImpersonated());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/93a9aefc/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
deleted file mode 100644
index c2abd60..0000000
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class SparkInterpreterLauncherTest {
- @Before
- public void setUp() {
- for (final ZeppelinConfiguration.ConfVars confVar : ZeppelinConfiguration.ConfVars.values()) {
- System.clearProperty(confVar.getVarName());
- }
- }
-
- @Test
- public void testConnectTimeOut() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty(
- ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
- InterpreterOption option = new InterpreterOption();
- option.setUserImpersonate(true);
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("name", interpreterProcess.getInterpreterSettingName());
- assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
- assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
- assertEquals(10000, interpreterProcess.getConnectTimeout());
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 2);
- assertEquals(true, interpreterProcess.isUserImpersonated());
- }
-
- @Test
- public void testLocalMode() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty("property_1", "value_1");
- properties.setProperty("master", "local[*]");
- properties.setProperty("spark.files", "file_1");
- properties.setProperty("spark.jars", "jar_1");
-
- InterpreterOption option = new InterpreterOption();
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("spark", interpreterProcess.getInterpreterSettingName());
- assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
- assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 2);
- assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
- assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
- }
-
- @Test
- public void testYarnClientMode_1() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty("property_1", "value_1");
- properties.setProperty("master", "yarn-client");
- properties.setProperty("spark.files", "file_1");
- properties.setProperty("spark.jars", "jar_1");
-
- InterpreterOption option = new InterpreterOption();
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("spark", interpreterProcess.getInterpreterSettingName());
- assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
- assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 2);
- assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
- assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
- }
-
- @Test
- public void testYarnClientMode_2() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty("property_1", "value_1");
- properties.setProperty("master", "yarn");
- properties.setProperty("spark.submit.deployMode", "client");
- properties.setProperty("spark.files", "file_1");
- properties.setProperty("spark.jars", "jar_1");
-
- InterpreterOption option = new InterpreterOption();
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("spark", interpreterProcess.getInterpreterSettingName());
- assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
- assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 2);
- assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
- assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
- }
-
- @Test
- public void testYarnClusterMode_1() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty("property_1", "value_1");
- properties.setProperty("master", "yarn-cluster");
- properties.setProperty("spark.files", "file_1");
- properties.setProperty("spark.jars", "jar_1");
-
- InterpreterOption option = new InterpreterOption();
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("spark", interpreterProcess.getInterpreterSettingName());
- assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
- assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 3);
- assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
- assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
- assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
- }
-
- @Test
- public void testYarnClusterMode_2() throws IOException {
- ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
- Properties properties = new Properties();
- properties.setProperty("SPARK_HOME", "/user/spark");
- properties.setProperty("property_1", "value_1");
- properties.setProperty("master", "yarn");
- properties.setProperty("spark.submit.deployMode", "cluster");
- properties.setProperty("spark.files", "file_1");
- properties.setProperty("spark.jars", "jar_1");
-
- InterpreterOption option = new InterpreterOption();
- option.setUserImpersonate(true);
- InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
- InterpreterClient client = launcher.launch(context);
- assertTrue( client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
- assertEquals("spark", interpreterProcess.getInterpreterSettingName());
- assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
- assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
- assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
- assertTrue(interpreterProcess.getEnv().size() >= 3);
- assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
- assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
- assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
- }
-}