You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2017/09/18 22:41:00 UTC
[1/2] zeppelin git commit: ZEPPELIN-2898. Support Yarn-Cluster for
Spark Interpreter
Repository: zeppelin
Updated Branches:
refs/heads/master d25639caa -> 5d7151097
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
index 98a08d9..d62c308 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepoTest.java
@@ -73,9 +73,7 @@ public class VFSNotebookRepoTest extends AbstractInterpreterTest implements JobL
@After
public void tearDown() throws Exception {
- if (!FileUtils.deleteQuietly(testRootDir)) {
- LOG.error("Failed to delete {} ", testRootDir.getName());
- }
+ super.tearDown();
}
@Test
[2/2] zeppelin git commit: ZEPPELIN-2898. Support Yarn-Cluster for
Spark Interpreter
Posted by zj...@apache.org.
ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter
### What is this PR for?
This is the first version for supporting yarn-cluster of `SparkInterpreter`. I just delegate all the function to `spark-submit` as yarn-cluster is natively supported by spark, we don't need to reinvent the wheel. But there's still improvement to be done in future, e.g. I put some spark specific logic in `InterpreterSetting` which is not a good practise. I plan to improve it when I refactor the `Interpreter` class (ZEPPELIN-2685).
Besides that, I also add `MiniHadoopCluster` & `MiniZeppelin` which help for the integration test of yarn-client & yarn-cluster mode, otherwise I have to manually verify yarn-client & yarn-cluster mode which would easily cause regression issue in future.
To be noticed:
* SPARK_HOME must be specified for yarn-cluster mode
* HADOOP_CONF_DIR must be specified for yarn-cluster mode
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-2898
### How should this be tested?
System test is added in `SparkInterpreterIT`.
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2577 from zjffdu/ZEPPELIN-2898 and squashes the following commits:
9da7c4b [Jeff Zhang] ZEPPELIN-2898. Support Yarn-Cluster for Spark Interpreter
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5d715109
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5d715109
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5d715109
Branch: refs/heads/master
Commit: 5d7151097e171c5ec9f22f150ac4ce92b5512013
Parents: d25639c
Author: Jeff Zhang <zj...@apache.org>
Authored: Mon Sep 4 21:54:56 2017 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Sep 19 06:40:51 2017 +0800
----------------------------------------------------------------------
.travis.yml | 4 +-
bin/common.sh | 6 +-
bin/interpreter.sh | 12 +-
conf/log4j_yarn_cluster.properties | 23 ++
docs/interpreter/spark.md | 2 +-
.../zeppelin/python/IPythonInterpreter.java | 44 +--
.../zeppelin/spark/IPySparkInterpreter.java | 11 +-
.../zeppelin/spark/PySparkInterpreter.java | 22 +-
.../org/apache/zeppelin/spark/PythonUtils.java | 96 +++++++
.../apache/zeppelin/spark/SparkInterpreter.java | 99 +------
.../src/main/resources/interpreter-setting.json | 4 +-
.../zeppelin/spark/IPySparkInterpreterTest.java | 1 +
.../spark/PySparkInterpreterMatplotlibTest.java | 2 +-
.../zeppelin/spark/PySparkInterpreterTest.java | 2 +-
.../interpreter/InterpreterContext.java | 47 ++-
zeppelin-server/pom.xml | 12 +
.../zeppelin/integration/AuthenticationIT.java | 2 +-
.../integration/InterpreterModeActionsIT.java | 2 +-
.../integration/PersonalizeActionsIT.java | 2 +-
.../zeppelin/rest/AbstractTestRestApi.java | 8 +-
zeppelin-zengine/pom.xml | 286 ++++++++++++++++++-
.../zeppelin/conf/ZeppelinConfiguration.java | 2 +-
.../interpreter/InterpreterSetting.java | 131 ++++++++-
.../interpreter/InterpreterSettingManager.java | 15 +-
.../remote/RemoteInterpreterEventPoller.java | 6 +-
.../remote/RemoteInterpreterManagedProcess.java | 2 +
.../remote/RemoteInterpreterProcess.java | 2 +-
.../interpreter/AbstractInterpreterTest.java | 23 +-
.../zeppelin/interpreter/MiniHadoopCluster.java | 114 ++++++++
.../zeppelin/interpreter/MiniZeppelin.java | 68 +++++
.../interpreter/SparkInterpreterModeTest.java | 147 ++++++++++
.../notebook/repo/VFSNotebookRepoTest.java | 4 +-
32 files changed, 1007 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 0d9e72a..4495aa4 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,7 +66,8 @@ matrix:
# Several tests were excluded from this configuration due to the following issues:
# HeliumApplicationFactoryTest - https://issues.apache.org/jira/browse/ZEPPELIN-2470
# After issues are fixed these tests need to be included back by removing them from the "-Dtests.to.exclude" property
- - jdk: "oraclejdk8"
+ - sudo: required
+ jdk: "oraclejdk8"
dist: precise
env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.2 -Pweb-ci -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org.apache.zeppelin.spark.*,**/HeliumApplicationFactoryTest.java -DfailIfNoTests=false"
@@ -143,6 +144,7 @@ before_script:
- if [[ -n $LIVY_VER ]]; then ./testing/downloadLivy.sh $LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-$LIVY_VER-bin; fi
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
+ - export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- echo "export ZEPPELIN_HELIUM_REGISTRY=helium" >> conf/zeppelin-env.sh
- tail conf/zeppelin-env.sh
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/common.sh
----------------------------------------------------------------------
diff --git a/bin/common.sh b/bin/common.sh
index c7100c7..d425cb1 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -122,7 +122,11 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
-JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
+if [[ -z "${SPARK_YARN_CLUSTER}" ]]; then
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
+else
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
+fi
export JAVA_INTP_OPTS
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index fd93a06..5245e25 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -143,7 +143,12 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}"
fi
unset PYSPARKPATH
+ export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
+ fi
+ if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
+ ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
+ else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then
if [[ -d "${HADOOP_HOME}/etc/hadoop" ]]; then
@@ -152,13 +157,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
export HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
-
- if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
- ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
- fi
-
- export SPARK_CLASSPATH+=":${ZEPPELIN_INTP_CLASSPATH}"
fi
+
elif [[ "${INTERPRETER_ID}" == "hbase" ]]; then
if [[ -n "${HBASE_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HBASE_CONF_DIR}"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/conf/log4j_yarn_cluster.properties
----------------------------------------------------------------------
diff --git a/conf/log4j_yarn_cluster.properties b/conf/log4j_yarn_cluster.properties
new file mode 100644
index 0000000..532fc5e
--- /dev/null
+++ b/conf/log4j_yarn_cluster.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/docs/interpreter/spark.md
----------------------------------------------------------------------
diff --git a/docs/interpreter/spark.md b/docs/interpreter/spark.md
index 780c60a..be5b3e5 100644
--- a/docs/interpreter/spark.md
+++ b/docs/interpreter/spark.md
@@ -424,7 +424,7 @@ It creates separated SparkContext per each notebook in `isolated` mode.
## IPython support
By default, zeppelin would use IPython in `pyspark` when IPython is available, Otherwise it would fall back to the original PySpark implementation.
-If you don't want to use IPython, then you can set `zeppelin.spark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
+If you don't want to use IPython, then you can set `zeppelin.pyspark.useIPython` as `false` in interpreter setting. For the IPython features, you can refer doc
[Python Interpreter](python.html)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 9b6f730..193c343 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -78,6 +78,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private long ipythonLaunchTimeout;
private String additionalPythonPath;
private String additionalPythonInitFile;
+ private boolean useBuiltinPy4j = true;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@@ -92,6 +93,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
* @param additionalPythonPath
*/
public void setAdditionalPythonPath(String additionalPythonPath) {
+ LOGGER.info("setAdditionalPythonPath: " + additionalPythonPath);
this.additionalPythonPath = additionalPythonPath;
}
@@ -105,6 +107,10 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
this.additionalPythonInitFile = additionalPythonInitFile;
}
+ public void setAddBulitinPy4j(boolean add) {
+ this.useBuiltinPy4j = add;
+ }
+
@Override
public void open() {
try {
@@ -113,6 +119,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return;
}
pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
+ LOGGER.info("Python Exec: " + pythonExecutable);
ipythonLaunchTimeout = Long.parseLong(
getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = new PythonZeppelinContext(
@@ -218,29 +225,34 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
watchDog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchDog);
- String py4jLibPath = null;
- if (System.getenv("ZEPPELIN_HOME") != null) {
- py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
- + PythonInterpreter.ZEPPELIN_PY4JPATH;
- } else {
- Path workingPath = Paths.get("..").toAbsolutePath();
- py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
- }
- if (additionalPythonPath != null) {
- // put the py4j at the end, because additionalPythonPath may already contain py4j.
- // e.g. PySparkInterpreter
- additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
- } else {
- additionalPythonPath = py4jLibPath;
+ if (useBuiltinPy4j) {
+ String py4jLibPath = null;
+ if (System.getenv("ZEPPELIN_HOME") != null) {
+ py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator
+ + PythonInterpreter.ZEPPELIN_PY4JPATH;
+ } else {
+ Path workingPath = Paths.get("..").toAbsolutePath();
+ py4jLibPath = workingPath + File.separator + PythonInterpreter.ZEPPELIN_PY4JPATH;
+ }
+ if (additionalPythonPath != null) {
+ // put the py4j at the end, because additionalPythonPath may already contain py4j.
+ // e.g. PySparkInterpreter
+ additionalPythonPath = additionalPythonPath + ":" + py4jLibPath;
+ } else {
+ additionalPythonPath = py4jLibPath;
+ }
}
+
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
- envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
+ if (additionalPythonPath != null) {
+ envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
+ }
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
- LOGGER.debug("PYTHONPATH: " + envs.get("PYTHONPATH"));
+ LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH"));
executor.execute(cmd, envs, this);
// wait until IPython kernel is started or timeout
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index f1b1435..56b3823 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -44,12 +44,15 @@ public class IPySparkInterpreter extends IPythonInterpreter {
@Override
public void open() {
- getProperty().setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
+ property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
sparkInterpreter = getSparkInterpreter();
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
- String additionalPythonPath = conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
- ":../interpreter/lib/python";
- setAdditionalPythonPath(additionalPythonPath);
+ // only set PYTHONPATH in local or yarn-client mode.
+ // yarn-cluster will setup PYTHONPATH automatically.
+ if (!conf.get("spark.submit.deployMode").equals("cluster")) {
+ setAdditionalPythonPath(PythonUtils.sparkPythonPath());
+ setAddBulitinPy4j(false);
+ }
setAdditionalPythonInitFile("python/zeppelin_ipyspark.py");
super.open();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index e65df22..dd32059 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -115,7 +115,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
public void open() {
// try IPySparkInterpreter first
iPySparkInterpreter = getIPySparkInterpreter();
- if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true") &&
+ if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
iPySparkInterpreter.checkIPythonPrerequisite()) {
try {
iPySparkInterpreter.open();
@@ -133,7 +133,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
iPySparkInterpreter = null;
- if (property.getProperty("zeppelin.spark.useIPython", "true").equals("true")) {
+ if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
// don't print it when it is in testing, just for easy output check in test.
try {
InterpreterContext.get().out.write(("IPython is not available, " +
@@ -202,13 +202,18 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
- private Map setupPySparkEnv() throws IOException{
+ private Map setupPySparkEnv() throws IOException {
Map env = EnvironmentUtils.getProcEnvironment();
- if (!env.containsKey("PYTHONPATH")) {
- SparkConf conf = getSparkConf();
- env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
- ":../interpreter/lib/python");
+ // only set PYTHONPATH in local or yarn-client mode.
+ // yarn-cluster will setup PYTHONPATH automatically.
+ SparkConf conf = getSparkConf();
+ if (!conf.get("spark.submit.deployMode", "client").equals("cluster")) {
+ if (!env.containsKey("PYTHONPATH")) {
+ env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
+ } else {
+ env.put("PYTHONPATH", PythonUtils.sparkPythonPath());
+ }
}
// get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
@@ -223,7 +228,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
- LOGGER.debug("PYTHONPATH: " + env.get("PYTHONPATH"));
+ LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
return env;
}
@@ -251,6 +256,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
gatewayServer.start();
String pythonExec = getPythonExec(property);
+ LOGGER.info("pythonExec: " + pythonExec);
CommandLine cmd = CommandLine.parse(pythonExec);
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
new file mode 100644
index 0000000..8182690
--- /dev/null
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PythonUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Util class for PySpark
+ */
+public class PythonUtils {
+
+ /**
+ * Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from ZEPPELIN_HOME
+ * when it is embedded mode.
+ *
+ * This method will called in zeppelin server process and spark driver process when it is
+ * local or yarn-client mode.
+ */
+ public static String sparkPythonPath() {
+ List<String> pythonPath = new ArrayList<String>();
+ String sparkHome = System.getenv("SPARK_HOME");
+ String zeppelinHome = System.getenv("ZEPPELIN_HOME");
+ if (zeppelinHome == null) {
+ zeppelinHome = new File("..").getAbsolutePath();
+ }
+ if (sparkHome != null) {
+ // non-embedded mode when SPARK_HOME is specified.
+ File pyspark = new File(sparkHome, "python/lib/pyspark.zip");
+ if (!pyspark.exists()) {
+ throw new RuntimeException("No pyspark.zip found under " + sparkHome + "/python/lib");
+ }
+ pythonPath.add(pyspark.getAbsolutePath());
+ File[] py4j = new File(sparkHome + "/python/lib").listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("py4j");
+ }
+ });
+ if (py4j.length == 0) {
+ throw new RuntimeException("No py4j files found under " + sparkHome + "/python/lib");
+ } else if (py4j.length > 1) {
+ throw new RuntimeException("Multiple py4j files found under " + sparkHome + "/python/lib");
+ } else {
+ pythonPath.add(py4j[0].getAbsolutePath());
+ }
+ } else {
+ // embedded mode
+ File pyspark = new File(zeppelinHome, "interpreter/spark/pyspark/pyspark.zip");
+ if (!pyspark.exists()) {
+ throw new RuntimeException("No pyspark.zip found: " + pyspark.getAbsolutePath());
+ }
+ pythonPath.add(pyspark.getAbsolutePath());
+ File[] py4j = new File(zeppelinHome, "interpreter/spark/pyspark").listFiles(
+ new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("py4j");
+ }
+ });
+ if (py4j.length == 0) {
+ throw new RuntimeException("No py4j files found under " + zeppelinHome +
+ "/interpreter/spark/pyspark");
+ } else if (py4j.length > 1) {
+ throw new RuntimeException("Multiple py4j files found under " + sparkHome +
+ "/interpreter/spark/pyspark");
+ } else {
+ pythonPath.add(py4j[0].getAbsolutePath());
+ }
+ }
+
+ // add ${ZEPPELIN_HOME}/interpreter/lib/python for all the cases
+ pythonPath.add(zeppelinHome + "/interpreter/lib/python");
+ return StringUtils.join(pythonPath, ":");
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index fd12a72..18da034 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -351,7 +351,11 @@ public class SparkInterpreter extends Interpreter {
}
public boolean isYarnMode() {
- return getProperty("master").startsWith("yarn");
+ String master = getProperty("master");
+ if (master == null) {
+ master = getProperty().getProperty("spark.master", "local[*]");
+ }
+ return master.startsWith("yarn");
}
/**
@@ -371,11 +375,6 @@ public class SparkInterpreter extends Interpreter {
conf.set("spark.executor.uri", execUri);
}
conf.set("spark.scheduler.mode", "FAIR");
- conf.setMaster(getProperty("master"));
- if (isYarnMode()) {
- conf.set("master", "yarn");
- conf.set("spark.submit.deployMode", "client");
- }
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
@@ -394,8 +393,6 @@ public class SparkInterpreter extends Interpreter {
}
}
- setupConfForPySpark(conf);
- setupConfForSparkR(conf);
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
@@ -529,96 +526,10 @@ public class SparkInterpreter extends Interpreter {
}
}
}
- setupConfForPySpark(conf);
- setupConfForSparkR(conf);
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;
}
- private void setupConfForPySpark(SparkConf conf) {
- Object pysparkBaseProperty =
- new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
- String pysparkBasePath = pysparkBaseProperty != null ? pysparkBaseProperty.toString() : null;
- File pysparkPath;
- if (null == pysparkBasePath) {
- pysparkBasePath =
- new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
- .getValue().toString();
- pysparkPath = new File(pysparkBasePath,
- "interpreter" + File.separator + "spark" + File.separator + "pyspark");
- } else {
- pysparkPath = new File(pysparkBasePath,
- "python" + File.separator + "lib");
- }
-
- //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist
- //TODO(zjffdu), this is not maintainable when new version is added.
- String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip",
- "py4j-0.10.1-src.zip", "py4j-0.10.3-src.zip", "py4j-0.10.4-src.zip"};
- ArrayList<String> pythonLibUris = new ArrayList<>();
- for (String lib : pythonLibs) {
- File libFile = new File(pysparkPath, lib);
- if (libFile.exists()) {
- pythonLibUris.add(libFile.toURI().toString());
- }
- }
- pythonLibUris.trimToSize();
-
- // Distribute two libraries(pyspark.zip and py4j-*.zip) to workers
- // when spark version is less than or equal to 1.4.1
- if (pythonLibUris.size() == 2) {
- try {
- String confValue = conf.get("spark.yarn.dist.files");
- conf.set("spark.yarn.dist.files", confValue + "," + Joiner.on(",").join(pythonLibUris));
- } catch (NoSuchElementException e) {
- conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
- }
- if (!useSparkSubmit()) {
- conf.set("spark.files", conf.get("spark.yarn.dist.files"));
- }
- conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
- conf.set("spark.submit.pyFiles", Joiner.on(",").join(pythonLibUris));
- }
-
- // Distributes needed libraries to workers
- // when spark version is greater than or equal to 1.5.0
- if (isYarnMode()) {
- conf.set("spark.yarn.isPython", "true");
- }
- }
-
- private void setupConfForSparkR(SparkConf conf) {
- Object sparkRBaseProperty =
- new DefaultInterpreterProperty("SPARK_HOME", null, null).getValue();
- String sparkRBasePath = sparkRBaseProperty != null ? sparkRBaseProperty.toString() : null;
- File sparkRPath;
- if (null == sparkRBasePath) {
- sparkRBasePath =
- new DefaultInterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../")
- .getValue().toString();
- sparkRPath = new File(sparkRBasePath,
- "interpreter" + File.separator + "spark" + File.separator + "R");
- } else {
- sparkRPath = new File(sparkRBasePath, "R" + File.separator + "lib");
- }
-
- sparkRPath = new File(sparkRPath, "sparkr.zip");
- if (sparkRPath.exists() && sparkRPath.isFile()) {
- String archives = null;
- if (conf.contains("spark.yarn.dist.archives")) {
- archives = conf.get("spark.yarn.dist.archives");
- }
- if (archives != null) {
- archives = archives + "," + sparkRPath + "#sparkr";
- } else {
- archives = sparkRPath + "#sparkr";
- }
- conf.set("spark.yarn.dist.archives", archives);
- } else {
- logger.warn("sparkr.zip is not found, sparkr may not work.");
- }
- }
-
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json
index d646805..7c13c49 100644
--- a/spark/src/main/resources/interpreter-setting.json
+++ b/spark/src/main/resources/interpreter-setting.json
@@ -150,9 +150,9 @@
"description": "Python command to run pyspark with",
"type": "string"
},
- "zeppelin.spark.useIPython": {
+ "zeppelin.pyspark.useIPython": {
"envName": null,
- "propertyName": "zeppelin.spark.useIPython",
+ "propertyName": "zeppelin.pyspark.useIPython",
"defaultValue": true,
"description": "whether use IPython when it is available",
"type": "checkbox"
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 5a2e884..3f7cf75 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -59,6 +59,7 @@ public class IPySparkInterpreterTest {
Properties p = new Properties();
p.setProperty("spark.master", "local[4]");
p.setProperty("master", "local[4]");
+ p.setProperty("spark.submit.deployMode", "client");
p.setProperty("spark.app.name", "Zeppelin Test");
p.setProperty("zeppelin.spark.useHiveContext", "true");
p.setProperty("zeppelin.spark.maxResult", "1000");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
index c6eb1d4..d695037 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java
@@ -89,7 +89,7 @@ public class PySparkInterpreterMatplotlibTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
- p.setProperty("zeppelin.spark.useIPython", "false");
+ p.setProperty("zeppelin.pyspark.useIPython", "false");
return p;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index ffdb4e8..7a4abd6 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -59,7 +59,7 @@ public class PySparkInterpreterTest {
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.pyspark.python", "python");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
- p.setProperty("zeppelin.spark.useIPython", "false");
+ p.setProperty("zeppelin.pyspark.useIPython", "false");
return p;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 4288ea3..f5fc70b 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,6 +17,8 @@
package org.apache.zeppelin.interpreter;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,7 +36,7 @@ import org.apache.zeppelin.resource.ResourcePool;
public class InterpreterContext {
private static final ThreadLocal<InterpreterContext> threadIC = new ThreadLocal<>();
- public final InterpreterOutput out;
+ public InterpreterOutput out;
public static InterpreterContext get() {
return threadIC.get();
@@ -48,21 +50,46 @@ public class InterpreterContext {
threadIC.remove();
}
- private final String noteId;
- private final String replName;
- private final String paragraphTitle;
- private final String paragraphId;
- private final String paragraphText;
+ private String noteId;
+ private String replName;
+ private String paragraphTitle;
+ private String paragraphId;
+ private String paragraphText;
private AuthenticationInfo authenticationInfo;
- private final Map<String, Object> config;
- private GUI gui;
+ private Map<String, Object> config = new HashMap<>();
+ private GUI gui = new GUI();
private AngularObjectRegistry angularObjectRegistry;
private ResourcePool resourcePool;
- private List<InterpreterContextRunner> runners;
+ private List<InterpreterContextRunner> runners = new ArrayList<>();
private String className;
private RemoteEventClientWrapper client;
private RemoteWorksController remoteWorksController;
- private final Map<String, Integer> progressMap;
+ private Map<String, Integer> progressMap;
+
+ /**
+ * Builder class for InterpreterContext
+ */
+ public static class Builder {
+ private InterpreterContext context = new InterpreterContext();
+
+ public Builder setNoteId(String noteId) {
+ context.noteId = noteId;
+ return this;
+ }
+
+ public Builder setParagraphId(String paragraphId) {
+ context.paragraphId = paragraphId;
+ return this;
+ }
+
+ public InterpreterContext getContext() {
+ return context;
+ }
+ }
+
+ private InterpreterContext() {
+
+ }
// visible for testing
public InterpreterContext(String noteId,
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 24d5ee7..e8db0c5 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -93,6 +93,18 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
index 7debf1b..3d1406a 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/AuthenticationIT.java
@@ -83,7 +83,7 @@ public class AuthenticationIT extends AbstractZeppelinIT {
}
try {
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
File file = new File(shiroPath);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
index 7f8765f..999e796 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/InterpreterModeActionsIT.java
@@ -82,7 +82,7 @@ public class InterpreterModeActionsIT extends AbstractZeppelinIT {
return;
}
try {
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
interpreterOptionPath = conf.getRelativeDir(String.format("%s/interpreter.json", conf.getConfDir()));
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
index b813ea9..dc07435 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/integration/PersonalizeActionsIT.java
@@ -74,7 +74,7 @@ public class PersonalizeActionsIT extends AbstractZeppelinIT {
return;
}
try {
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
shiroPath = conf.getRelativeDir(String.format("%s/shiro.ini", conf.getConfDir()));
File file = new File(shiroPath);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index e2f171f..7675cf6 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -126,8 +126,8 @@ public abstract class AbstractTestRestApi {
private static void start(boolean withAuth) throws Exception {
if (!wasRunning) {
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), "../");
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), "../zeppelin-web/dist");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("../").getAbsolutePath());
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName(), new File("../zeppelin-web/dist").getAbsolutePath());
// some test profile does not build zeppelin-web.
// to prevent zeppelin starting up fail, create zeppelin-web/dist directory
@@ -211,7 +211,7 @@ public abstract class AbstractTestRestApi {
// set spark home for pyspark
sparkProperties.put("spark.home",
new InterpreterProperty("spark.home", getSparkHome(), InterpreterPropertyType.TEXTAREA.getValue()));
- sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
+ sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
sparkIntpSetting.setProperties(sparkProperties);
pySpark = true;
@@ -234,7 +234,7 @@ public abstract class AbstractTestRestApi {
new InterpreterProperty("spark.home", sparkHome, InterpreterPropertyType.TEXTAREA.getValue()));
sparkProperties.put("zeppelin.spark.useHiveContext",
new InterpreterProperty("zeppelin.spark.useHiveContext", false, InterpreterPropertyType.CHECKBOX.getValue()));
- sparkProperties.put("zeppelin.spark.useIPython", new InterpreterProperty("zeppelin.spark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
+ sparkProperties.put("zeppelin.pyspark.useIPython", new InterpreterProperty("zeppelin.pyspark.useIPython", "false", InterpreterPropertyType.TEXTAREA.getValue()));
pySpark = true;
sparkR = true;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 314ca18..c67df6b 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -36,7 +36,7 @@
<properties>
<!--library versions-->
- <hadoop.version>2.6.0</hadoop.version>
+ <hadoop.version>2.7.3</hadoop.version>
<commons.lang3.version>3.4</commons.lang3.version>
<commons.vfs2.version>2.0</commons.vfs2.version>
<aws.sdk.s3.version>1.10.62</aws.sdk.s3.version>
@@ -214,12 +214,6 @@
</exclusions>
</dependency>
- <dependency> <!-- because there are two of them above -->
- <groupId>xml-apis</groupId>
- <artifactId>xml-apis</artifactId>
- <version>${xml.apis.version}</version>
- </dependency>
-
<dependency>
<groupId>org.eclipse.jgit</groupId>
<artifactId>org.eclipse.jgit</artifactId>
@@ -299,21 +293,91 @@
</dependency>
<dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>3.4.1</version>
- </dependency>
- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.5</version>
</dependency>
<dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <version>3.4.1</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-webdav</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jgit</groupId>
+ <artifactId>org.eclipse.jgit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xml-apis</groupId>
+ <artifactId>xml-apis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
@@ -325,9 +389,90 @@
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-webdav</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jgit</groupId>
+ <artifactId>org.eclipse.jgit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xml-apis</groupId>
+ <artifactId>xml-apis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <!--<exclusion>-->
+ <!--<groupId>com.sun.jersey</groupId>-->
+ <!--<artifactId>jersey-core</artifactId>-->
+ <!--</exclusion>-->
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <!--<exclusion>-->
+ <!--<groupId>com.sun.jersey</groupId>-->
+ <!--<artifactId>jersey-server</artifactId>-->
+ <!--</exclusion>-->
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
@@ -349,8 +494,74 @@
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
+ <groupId>org.eclipse.jgit</groupId>
+ <artifactId>org.eclipse.jgit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.jcraft</groupId>
+ <artifactId>jsch</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xml-apis</groupId>
+ <artifactId>xml-apis</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>xerces</groupId>
+ <artifactId>xercesImpl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-tests</artifactId>
+ <version>${hadoop.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <!--<exclusion>-->
+ <!--<groupId>com.sun.jersey</groupId>-->
+ <!--<artifactId>jersey-json</artifactId>-->
+ <!--</exclusion>-->
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-webdav</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jgit</groupId>
@@ -373,9 +584,50 @@
<artifactId>xercesImpl</artifactId>
</exclusion>
<exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zeppelin</groupId>
+ <artifactId>zeppelin-spark_2.10</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-context</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
@@ -392,6 +644,12 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>always</forkMode>
+ <systemProperties>
+ <java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
+ </systemProperties>
+ <environmentVariables>
+ <!--<ZEPPELIN_HOME>..</ZEPPELIN_HOME>-->
+ </environmentVariables>
</configuration>
</plugin>
</plugins>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index ba90ed8..2dec19c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -493,7 +493,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
}
public String getConfDir() {
- return getString(ConfVars.ZEPPELIN_CONF_DIR);
+ return getRelativeDir(ConfVars.ZEPPELIN_CONF_DIR);
}
public List<String> getAllowedOrigins()
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 79618a3..9a453d8 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
@@ -26,6 +27,7 @@ import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
import com.google.gson.internal.StringMap;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.DependencyResolver;
@@ -44,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
@@ -55,6 +58,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -452,7 +456,9 @@ public class InterpreterSetting {
Properties jProperties = new Properties();
Map<String, InterpreterProperty> iProperties = (Map<String, InterpreterProperty>) properties;
for (Map.Entry<String, InterpreterProperty> entry : iProperties.entrySet()) {
- jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+ if (entry.getValue().getValue() != null) {
+ jProperties.setProperty(entry.getKey(), entry.getValue().getValue().toString());
+ }
}
if (!jProperties.containsKey("zeppelin.interpreter.output.limit")) {
@@ -707,22 +713,133 @@ public class InterpreterSetting {
interpreterRunner != null ? interpreterRunner.getPath() :
conf.getInterpreterRemoteRunnerPath(), conf.getCallbackPortRange(),
interpreterDir, localRepoPath,
- getEnvFromInterpreterProperty(getJavaProperties()), connectTimeout,
+ getEnvFromInterpreterProperty(), connectTimeout,
remoteInterpreterProcessListener, appEventListener, group);
}
return remoteInterpreterProcess;
}
- private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
- Map<String, String> env = new HashMap<>();
- for (Object key : property.keySet()) {
- if (RemoteInterpreterUtils.isEnvString((String) key)) {
- env.put((String) key, property.getProperty((String) key));
+ private boolean isSparkConf(String key, String value) {
+ return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
+ }
+
+ private Map<String, String> getEnvFromInterpreterProperty() {
+ Map<String, String> env = new HashMap<String, String>();
+ Properties javaProperties = getJavaProperties();
+ Properties sparkProperties = new Properties();
+ String sparkMaster = getSparkMaster();
+ for (String key : javaProperties.stringPropertyNames()) {
+ if (RemoteInterpreterUtils.isEnvString(key)) {
+ env.put(key, javaProperties.getProperty(key));
}
+ if (isSparkConf(key, javaProperties.getProperty(key))) {
+ sparkProperties.setProperty(key, toShellFormat(javaProperties.getProperty(key)));
+ }
+ }
+
+ setupPropertiesForPySpark(sparkProperties);
+ setupPropertiesForSparkR(sparkProperties, javaProperties.getProperty("SPARK_HOME"));
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ env.put("SPARK_YARN_CLUSTER", "true");
}
+
+ StringBuilder sparkConfBuilder = new StringBuilder();
+ if (sparkMaster != null) {
+ sparkConfBuilder.append(" --master " + sparkMaster);
+ }
+ if (isYarnMode() && getDeployMode().equals("cluster")) {
+ sparkConfBuilder.append(" --files " + conf.getConfDir() + "/log4j_yarn_cluster.properties");
+ }
+ for (String name : sparkProperties.stringPropertyNames()) {
+ sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
+ }
+
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
+ LOGGER.debug("getEnvFromInterpreterProperty: " + env);
return env;
}
+ private void setupPropertiesForPySpark(Properties sparkProperties) {
+ if (isYarnMode()) {
+ sparkProperties.setProperty("spark.yarn.isPython", "true");
+ }
+ }
+
+ private void mergeSparkProperty(Properties sparkProperties, String propertyName,
+ String propertyValue) {
+ if (sparkProperties.containsKey(propertyName)) {
+ String oldPropertyValue = sparkProperties.getProperty(propertyName);
+ sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
+ } else {
+ sparkProperties.setProperty(propertyName, propertyValue);
+ }
+ }
+
+ private void setupPropertiesForSparkR(Properties sparkProperties,
+ String sparkHome) {
+ File sparkRBasePath = null;
+ if (sparkHome == null) {
+ if (!getSparkMaster().startsWith("local")) {
+ throw new RuntimeException("SPARK_HOME is not specified for non-local mode");
+ }
+ String zeppelinHome = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
+ sparkRBasePath = new File(zeppelinHome,
+ "interpreter" + File.separator + "spark" + File.separator + "R");
+ } else {
+ sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
+ }
+
+ File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
+ if (sparkRPath.exists() && sparkRPath.isFile()) {
+ mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
+ } else {
+ LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
+ }
+ }
+
+ private String getSparkMaster() {
+ String master = getJavaProperties().getProperty("master");
+ if (master == null) {
+ master = getJavaProperties().getProperty("spark.master", "local[*]");
+ }
+ return master;
+ }
+
+ private String getDeployMode() {
+ String master = getSparkMaster();
+ if (master.equals("yarn-client")) {
+ return "client";
+ } else if (master.equals("yarn-cluster")) {
+ return "cluster";
+ } else if (master.startsWith("local")) {
+ return "client";
+ } else {
+ String deployMode = getJavaProperties().getProperty("spark.submit.deployMode");
+ if (deployMode == null) {
+ throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
+ "is not specified");
+ }
+ if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
+ throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
+ }
+ return deployMode;
+ }
+ }
+
+ private boolean isYarnMode() {
+ return getSparkMaster().startsWith("yarn");
+ }
+
+ private String toShellFormat(String value) {
+ if (value.contains("\'") && value.contains("\"")) {
+ throw new RuntimeException("Spark property value could not contain both \" and '");
+ } else if (value.contains("\'")) {
+ return "\"" + value + "\"";
+ } else {
+ return "\'" + value + "\'";
+ }
+ }
+
private List<Interpreter> getOrCreateSession(String user, String noteId) {
ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
index 585a58a..73babab 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java
@@ -142,7 +142,7 @@ public class InterpreterSettingManager {
this.interpreterDirPath = Paths.get(conf.getInterpreterDir());
LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath);
this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath());
- LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath);
+ LOGGER.debug("InterpreterSettingPath: {}", interpreterSettingPath);
this.dependencyResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.interpreterRepositories = dependencyResolver.getRepos();
@@ -283,7 +283,7 @@ public class InterpreterSettingManager {
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
String interpreterJson) throws IOException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
- ClassLoader tempClassLoader = new URLClassLoader(urls, cl);
+ ClassLoader tempClassLoader = new URLClassLoader(urls, null);
URL url = tempClassLoader.getResource(interpreterJson);
if (url == null) {
@@ -392,6 +392,17 @@ public class InterpreterSettingManager {
return settings;
}
+ public InterpreterSetting getInterpreterSettingByName(String name) {
+ synchronized (interpreterSettings) {
+ for (InterpreterSetting setting : interpreterSettings.values()) {
+ if (setting.getName().equals(name)) {
+ return setting;
+ }
+ }
+ }
+ throw new RuntimeException("No such interpreter setting: " + name);
+ }
+
public ManagedInterpreterGroup getInterpreterGroupById(String groupId) {
for (InterpreterSetting setting : interpreterSettings.values()) {
ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
index ca23bcf..35b6b6c 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -254,7 +254,11 @@ public class RemoteInterpreterEventPoller extends Thread {
try {
clearUnreadEvents(interpreterProcess.getClient());
} catch (Exception e1) {
- logger.error("Can't get RemoteInterpreterEvent", e1);
+ if (shutdown) {
+ logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
+ } else {
+ logger.error("Can't get RemoteInterpreterEvent", e1);
+ }
}
if (appendFuture != null) {
appendFuture.cancel(true);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
index 2d64831..d21a962 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java
@@ -226,6 +226,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
public void stop() {
+ // shutdown EventPoller first.
+ this.remoteInterpreterEventPoller.shutdown();
if (callbackServer.isServing()) {
callbackServer.stop();
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index d34c538..e45f15b 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -32,7 +32,7 @@ public abstract class RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private GenericObjectPool<Client> clientPool;
- private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+ protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
index 21d7526..9ab2137 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java
@@ -25,14 +25,10 @@ import static org.mockito.Mockito.mock;
*/
public abstract class AbstractInterpreterTest {
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractInterpreterTest.class);
- private static final String INTERPRETER_SCRIPT =
- System.getProperty("os.name").startsWith("Windows") ?
- "../bin/interpreter.cmd" :
- "../bin/interpreter.sh";
protected InterpreterSettingManager interpreterSettingManager;
protected InterpreterFactory interpreterFactory;
- protected File testRootDir;
+ protected File zeppelinHome;
protected File interpreterDir;
protected File confDir;
protected File notebookDir;
@@ -41,12 +37,11 @@ public abstract class AbstractInterpreterTest {
@Before
public void setUp() throws Exception {
// copy the resources files to a temp folder
- testRootDir = new File(System.getProperty("java.io.tmpdir") + "/Zeppelin_Test_" + System.currentTimeMillis());
- testRootDir.mkdirs();
- LOGGER.info("Create tmp directory: {} as root folder of ZEPPELIN_INTERPRETER_DIR & ZEPPELIN_CONF_DIR", testRootDir.getAbsolutePath());
- interpreterDir = new File(testRootDir, "interpreter");
- confDir = new File(testRootDir, "conf");
- notebookDir = new File(testRootDir, "notebook");
+ zeppelinHome = new File("..");
+ LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
+ interpreterDir = new File(zeppelinHome, "interpreter_" + getClass().getSimpleName());
+ confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
+ notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
interpreterDir.mkdirs();
confDir.mkdirs();
@@ -55,10 +50,10 @@ public abstract class AbstractInterpreterTest {
FileUtils.copyDirectory(new File("src/test/resources/interpreter"), interpreterDir);
FileUtils.copyDirectory(new File("src/test/resources/conf"), confDir);
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), zeppelinHome.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath());
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
- System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER.getVarName(), INTERPRETER_SCRIPT);
conf = new ZeppelinConfiguration();
interpreterSettingManager = new InterpreterSettingManager(conf,
@@ -69,6 +64,8 @@ public abstract class AbstractInterpreterTest {
@After
public void tearDown() throws Exception {
interpreterSettingManager.close();
- FileUtils.deleteDirectory(testRootDir);
+ FileUtils.deleteDirectory(interpreterDir);
+ FileUtils.deleteDirectory(confDir);
+ FileUtils.deleteDirectory(notebookDir);
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
new file mode 100644
index 0000000..619d01a
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniHadoopCluster.java
@@ -0,0 +1,114 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+
+/**
+ *
+ * Util class for creating a Mini Hadoop cluster in local machine to test scenarios that needs
+ * hadoop cluster.
+ */
+public class MiniHadoopCluster {
+
+ private static Logger LOGGER = LoggerFactory.getLogger(MiniHadoopCluster.class);
+
+ private Configuration hadoopConf;
+ private MiniDFSCluster dfsCluster;
+ private MiniYARNCluster yarnCluster;
+ private String configPath = new File("target/tests/hadoop_conf").getAbsolutePath();
+
+ @BeforeClass
+ public void start() throws IOException {
+ LOGGER.info("Starting MiniHadoopCluster ...");
+ this.hadoopConf = new Configuration();
+ new File(configPath).mkdirs();
+ // start MiniDFSCluster
+ this.dfsCluster = new MiniDFSCluster.Builder(hadoopConf)
+ .numDataNodes(2)
+ .format(true)
+ .waitSafeMode(true)
+ .build();
+ this.dfsCluster.waitActive();
+ saveConfig(hadoopConf, configPath + "/core-site.xml");
+
+ // start MiniYarnCluster
+ YarnConfiguration baseConfig = new YarnConfiguration(hadoopConf);
+ this.yarnCluster = new MiniYARNCluster(getClass().getName(), 2,
+ 1, 1);
+ yarnCluster.init(baseConfig);
+
+ // Install a shutdown hook for stop the service and kill all running applications.
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ yarnCluster.stop();
+ }
+ });
+
+ yarnCluster.start();
+
+ // Workaround for YARN-2642.
+ Configuration yarnConfig = yarnCluster.getConfig();
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 30 * 1000) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ if (!yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
+ break;
+ }
+ }
+ if (yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")[1].equals("0")) {
+ throw new IOException("RM not up yes");
+ }
+
+ LOGGER.info("RM address in configuration is " + yarnConfig.get(YarnConfiguration.RM_ADDRESS));
+ saveConfig(yarnConfig,configPath + "/yarn-site.xml");
+ }
+
+ protected void saveConfig(Configuration conf, String dest) throws IOException {
+ Configuration redacted = new Configuration(conf);
+ // This setting references a test class that is not available when using a real Spark
+ // installation, so remove it from client configs.
+ redacted.unset("net.topology.node.switch.mapping.impl");
+
+ FileOutputStream out = new FileOutputStream(dest);
+ try {
+ redacted.writeXml(out);
+ } finally {
+ out.close();
+ }
+ LOGGER.info("Save configuration to " + dest);
+ }
+
+ @AfterClass
+ public void stop() {
+ if (this.yarnCluster != null) {
+ this.yarnCluster.stop();
+ }
+ if (this.dfsCluster != null) {
+ this.dfsCluster.shutdown();
+ }
+ }
+
+ public String getConfigPath() {
+ return configPath;
+ }
+
+ public MiniYARNCluster getYarnCluster() {
+ return yarnCluster;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
new file mode 100644
index 0000000..923ae5a
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java
@@ -0,0 +1,68 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+
+public class MiniZeppelin {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(MiniZeppelin.class);
+
+ protected InterpreterSettingManager interpreterSettingManager;
+ protected InterpreterFactory interpreterFactory;
+ protected File zeppelinHome;
+ private File confDir;
+ private File notebookDir;
+ protected ZeppelinConfiguration conf;
+
+ public void start() throws IOException {
+ zeppelinHome = new File("..");
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(),
+ zeppelinHome.getAbsolutePath());
+ confDir = new File(zeppelinHome, "conf_" + getClass().getSimpleName());
+ notebookDir = new File(zeppelinHome, "notebook_" + getClass().getSimpleName());
+ confDir.mkdirs();
+ notebookDir.mkdirs();
+ LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome.getAbsolutePath());
+ FileUtils.copyFile(new File(zeppelinHome, "conf/log4j.properties"), new File(confDir, "log4j.properties"));
+ FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties"));
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath());
+ System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath());
+ conf = new ZeppelinConfiguration();
+ interpreterSettingManager = new InterpreterSettingManager(conf,
+ mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class));
+ interpreterFactory = new InterpreterFactory(interpreterSettingManager);
+ }
+
+ public void stop() throws IOException {
+ interpreterSettingManager.close();
+ FileUtils.deleteDirectory(confDir);
+ FileUtils.deleteDirectory(notebookDir);
+ }
+
+ public File getZeppelinHome() {
+ return zeppelinHome;
+ }
+
+ public File getZeppelinConfDir() {
+ return confDir;
+ }
+
+ public InterpreterFactory getInterpreterFactory() {
+ return interpreterFactory;
+ }
+
+ public InterpreterSettingManager getInterpreterSettingManager() {
+ return interpreterSettingManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5d715109/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
new file mode 100644
index 0000000..24a9aee
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkInterpreterModeTest.java
@@ -0,0 +1,147 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SparkInterpreterModeTest {
+
+ private static MiniHadoopCluster hadoopCluster;
+ private static MiniZeppelin zeppelin;
+ private static InterpreterFactory interpreterFactory;
+ private static InterpreterSettingManager interpreterSettingManager;
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ hadoopCluster = new MiniHadoopCluster();
+ hadoopCluster.start();
+
+ zeppelin = new MiniZeppelin();
+ zeppelin.start();
+ interpreterFactory = zeppelin.getInterpreterFactory();
+ interpreterSettingManager = zeppelin.getInterpreterSettingManager();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (zeppelin != null) {
+ zeppelin.stop();
+ }
+ if (hadoopCluster != null) {
+ hadoopCluster.stop();
+ }
+ }
+
+ private void testInterpreterBasics() throws IOException {
+ // test SparkInterpreter
+ interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
+ Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
+
+ InterpreterContext context = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").getContext();
+ InterpreterResult interpreterResult = sparkInterpreter.interpret("sc.version", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+ interpreterResult = sparkInterpreter.interpret("sc.range(1,10).sum()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+ assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
+
+ // test PySparkInterpreter
+ Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
+ interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+ // test IPySparkInterpreter
+ Interpreter ipySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark");
+ interpreterResult = ipySparkInterpreter.interpret("sqlContext.table('test').show()", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+
+ // test SparkSQLInterpreter
+ Interpreter sqlInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.sql");
+ interpreterResult = sqlInterpreter.interpret("select count(1) from test", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
+ assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
+ assertEquals("count(1)\n2\n", interpreterResult.message().get(0).getData());
+ }
+
+ @Test
+ public void testLocalMode() throws IOException, YarnException {
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ sparkInterpreterSetting.setProperty("master", "local[*]");
+ sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+ sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+ sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+
+ testInterpreterBasics();
+
+ // no yarn application launched
+ GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+ GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+ assertEquals(0, response.getApplicationList().size());
+
+ interpreterSettingManager.close();
+ }
+
+ @Test
+ public void testYarnClientMode() throws IOException, YarnException, InterruptedException {
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ sparkInterpreterSetting.setProperty("master", "yarn-client");
+ sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+ sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+ sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+ sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+ sparkInterpreterSetting.setProperty("PYSPARK_PYTHON", getPythonExec());
+ sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+ testInterpreterBasics();
+
+ // 1 yarn application launched
+ GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+ GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+ assertEquals(1, response.getApplicationList().size());
+
+ interpreterSettingManager.close();
+ }
+
+ @Test
+ public void testYarnClusterMode() throws IOException, YarnException, InterruptedException {
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ sparkInterpreterSetting.setProperty("master", "yarn-cluster");
+ sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+ sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
+ sparkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+ sparkInterpreterSetting.setProperty("zeppelin.spark.useHiveContext", "false");
+ sparkInterpreterSetting.setProperty("zeppelin.pyspark.useIPython", "false");
+ sparkInterpreterSetting.setProperty("spark.pyspark.python", getPythonExec());
+ sparkInterpreterSetting.setProperty("spark.driver.memory", "512m");
+
+ testInterpreterBasics();
+
+ // 1 yarn application launched
+ GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+ GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+ assertEquals(1, response.getApplicationList().size());
+
+ interpreterSettingManager.close();
+ }
+
+ private String getPythonExec() throws IOException, InterruptedException {
+ Process process = Runtime.getRuntime().exec(new String[]{"which", "python"});
+ if (process.waitFor() != 0) {
+ throw new RuntimeException("Fail to run command: which python.");
+ }
+ return IOUtils.toString(process.getInputStream()).trim();
+ }
+}