You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/11/06 10:25:10 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5070] Improve
start/shutdown and signal handling
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 9d5d0f0 [ZEPPELIN-5070] Improve start/shutdown and signal handling
9d5d0f0 is described below
commit 9d5d0f01c446fdfb16738e4146b8f9042ee12d9e
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Oct 21 16:55:22 2020 +0200
[ZEPPELIN-5070] Improve start/shutdown and signal handling
### What is this PR for?
These PR touch the start and stop procedures of all interpreters.
- improved start script with [shellcheck](https://www.shellcheck.net/) recommendations
- Use `exec [...]` instead of `eval [..] &`, which means that the Java interpreter process is not a fork of the shell script
-> no trap handling required in the start script
-> signals land in the JVM
- Correct escaping is done by the start-script
- remove anonymous threads in 'RemoteInterpreterServer.java and give the thread nice names
- Use TServer's StopTimeoutVal and stopTimeoutUnit for faster shutdown
- remove special K8s shutdown handling
- unregister interpreter during unpredictable shutdown
### What type of PR is it?
- Improvement
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5070
### How should this be tested?
* Travis-CI: https://travis-ci.com/github/Reamer/zeppelin/builds/198263926
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #3925 from Reamer/signal_start_stop_handling and squashes the following commits:
c4da96114 [Philipp Dallig] Add logback.xml for S3 tests to disable debug log messages
6fbb9735e [Philipp Dallig] Escaping is done in start-script
bc2f7dc46 [Philipp Dallig] Start ThriftServer in Run-Thread RemoteInterpreterServer-Thread and close the TServerSocket
33d951c3b [Philipp Dallig] Use args instead of command and start zeppelin-interpreter application in tini
2626af28c [Philipp Dallig] Remove special signal handling in K8s
683345de7 [Philipp Dallig] Use exec instead of eval to start interpreter application, no need to fork
b5c9bb0c0 [Philipp Dallig] Use glob instead of find - SC2044
7766f79c5 [Philipp Dallig] Use { ..; } instead of (..) to avoid subshell overhead. - SC2235
2194adade [Philipp Dallig] Declare and assign separately to avoid masking return values - SC2155
d28abe167 [Philipp Dallig] Remove quotes from lofgiles
143412181 [Philipp Dallig] Double quote to prevent globbing and word splitting - SC2086
3597c0fd8 [Philipp Dallig] remove unnecessary function calls or move to new $() style
ee3cbdad3 [Philipp Dallig] Use "-n" instead of "! -z" SC2236
1a8e3a81b [Philipp Dallig] Improve py4j pattern
13bbfecd4 [Philipp Dallig] cleanup and style changes
282f16732 [Philipp Dallig] Indicate a force shutting down with an other status code
bce7cc6cb [Philipp Dallig] Add stopTimeoutVal and stopTimeoutUnit to TServer
3d9c36487 [Philipp Dallig] Remove sun.misc.Signal and sun.misc.SignalHandler, which prints compilation warnings
05201c40b [Philipp Dallig] Add a RegisterRunnable
(cherry picked from commit e4eaa5ee974465b24c2f3bf89ce14040d3559f7b)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
bin/common.sh | 27 +-
bin/interpreter.sh | 79 ++---
k8s/interpreter/100-interpreter-spec.yaml | 22 +-
scripts/docker/zeppelin-interpreter/Dockerfile | 3 +-
.../remote/RemoteInterpreterServer.java | 332 ++++++++++++---------
.../interpreter/remote/RemoteInterpreterUtils.java | 36 +--
.../zeppelin/interpreter/remote/YarnUtils.java | 4 +-
.../zeppelin/interpreter/util/ProcessLauncher.java | 11 +-
.../remote/RemoteInterpreterUtilsTest.java | 15 +-
.../notebookrepo/s3/src/test/resources/logback.xml | 28 ++
.../interpreter/RemoteInterpreterEventServer.java | 23 +-
.../launcher/SparkInterpreterLauncher.java | 8 +-
.../launcher/SparkInterpreterLauncherTest.java | 24 +-
.../org/apache/zeppelin/notebook/NotebookTest.java | 2 +
14 files changed, 319 insertions(+), 295 deletions(-)
diff --git a/bin/common.sh b/bin/common.sh
index efb2ebe..6b8a4bc 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -16,8 +16,8 @@
# limitations under the License.
#
-if [ -L ${BASH_SOURCE-$0} ]; then
- FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}"))
+if [ -L "${BASH_SOURCE-$0}" ]; then
+ FWDIR=$(dirname "$(readlink "${BASH_SOURCE-$0}")")
else
FWDIR=$(dirname "${BASH_SOURCE-$0}")
fi
@@ -25,7 +25,8 @@ fi
if [[ -z "${ZEPPELIN_HOME}" ]]; then
# Make ZEPPELIN_HOME look cleaner in logs by getting rid of the
# extra ../
- export ZEPPELIN_HOME="$(cd "${FWDIR}/.."; pwd)"
+ ZEPPELIN_HOME="$(cd "${FWDIR}/.." || exit; pwd)"
+ export ZEPPELIN_HOME
fi
if [[ -z "${ZEPPELIN_CONF_DIR}" ]]; then
@@ -44,7 +45,8 @@ if [[ -z "${ZEPPELIN_WAR}" ]]; then
if [[ -d "${ZEPPELIN_HOME}/zeppelin-web/dist" ]]; then
export ZEPPELIN_WAR="${ZEPPELIN_HOME}/zeppelin-web/dist"
else
- export ZEPPELIN_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-[0-9]*.war")
+ ZEPPELIN_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-[0-9]*.war")
+ export ZEPPELIN_WAR
fi
fi
@@ -52,7 +54,8 @@ if [[ -z "${ZEPPELIN_ANGULAR_WAR}" ]]; then
if [[ -d "${ZEPPELIN_HOME}/zeppelin-web/dist" ]]; then
export ZEPPELIN_ANGULAR_WAR="${ZEPPELIN_HOME}/zeppelin-web-angular/dist/zeppelin"
else
- export ZEPPELIN_ANGULAR_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-angular*.war")
+ ZEPPELIN_ANGULAR_WAR=$(find -L "${ZEPPELIN_HOME}" -name "zeppelin-web-angular*.war")
+ export ZEPPELIN_ANGULAR_WAR
fi
fi
@@ -70,7 +73,7 @@ function check_java_version() {
JVM_VERSION=$(echo "$jvmver"|sed -e 's|^1\.\([0-9][0-9]*\)\..*$|\1|')
fi
- if [ "$JVM_VERSION" -lt 8 ] || ([ "$JVM_VERSION" -eq 8 ] && [ "${jvmver#*_}" -lt 151 ]) ; then
+ if [ "$JVM_VERSION" -lt 8 ] || { [ "$JVM_VERSION" -eq 8 ] && [ "${jvmver#*_}" -lt 151 ]; } ; then
echo "Apache Zeppelin requires either Java 8 update 151 or newer"
exit 1;
fi
@@ -78,7 +81,7 @@ function check_java_version() {
function addEachJarInDir(){
if [[ -d "${1}" ]]; then
- for jar in $(find -L "${1}" -maxdepth 1 -name '*jar'); do
+ for jar in "${1}"/*.jar ; do
ZEPPELIN_CLASSPATH="$jar:$ZEPPELIN_CLASSPATH"
done
fi
@@ -86,7 +89,7 @@ function addEachJarInDir(){
function addEachJarInDirRecursive(){
if [[ -d "${1}" ]]; then
- for jar in $(find -L "${1}" -type f -name '*jar'); do
+ for jar in "${1}"/**/*.jar ; do
ZEPPELIN_CLASSPATH="$jar:$ZEPPELIN_CLASSPATH"
done
fi
@@ -94,7 +97,7 @@ function addEachJarInDirRecursive(){
function addEachJarInDirRecursiveForIntp(){
if [[ -d "${1}" ]]; then
- for jar in ${1}/*.jar; do
+ for jar in "${1}"/*.jar; do
ZEPPELIN_INTP_CLASSPATH="$jar:${ZEPPELIN_INTP_CLASSPATH}"
done
fi
@@ -120,7 +123,7 @@ function getZeppelinVersion(){
fi
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
- $ZEPPELIN_RUNNER -cp $CLASSPATH $ZEPPELIN_COMMANDLINE_MAIN -v
+ $ZEPPELIN_RUNNER -cp "${CLASSPATH}" "${ZEPPELIN_COMMANDLINE_MAIN}" -v
exit 0
}
@@ -149,9 +152,9 @@ export JAVA_OPTS
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
if [[ -n "${ZEPPELIN_IN_DOCKER}" ]]; then
- JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j_docker.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2_docker.properties'"
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j_docker.properties -Dlog4j.configurationFile=file://${ZEPPELIN_CONF_DIR}/log4j2_docker.properties"
elif [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
- JAVA_INTP_OPTS+=" -Dlog4j.configuration='file://${ZEPPELIN_CONF_DIR}/log4j.properties' -Dlog4j.configurationFile='file://${ZEPPELIN_CONF_DIR}/log4j2.properties'"
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties -Dlog4j.configurationFile=file://${ZEPPELIN_CONF_DIR}/log4j2.properties"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
fi
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index d1d6315..2eac09d 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -35,7 +35,7 @@ if [ -f /proc/self/cgroup ] && [ -n "$(command -v getent)" ]; then
set +e
uidentry="$(getent passwd "$myuid")"
set -e
-
+
# If there is no passwd entry for the container UID, attempt to create one
if [ -z "$uidentry" ] ; then
if [ -w /etc/passwd ] ; then
@@ -129,22 +129,22 @@ ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/zeppelin-interpreter-${INTERPRETER_GROUP_I
if [[ -z "$ZEPPELIN_IMPERSONATE_CMD" ]]; then
if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then
- ZEPPELIN_IMPERSONATE_RUN_CMD=`echo "ssh ${ZEPPELIN_IMPERSONATE_USER}@localhost" `
+ ZEPPELIN_IMPERSONATE_RUN_CMD=("ssh" "${ZEPPELIN_IMPERSONATE_USER}@localhost")
fi
else
ZEPPELIN_IMPERSONATE_RUN_CMD=$(eval "echo ${ZEPPELIN_IMPERSONATE_CMD} ")
fi
-if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
+if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
fi
ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
-JAVA_INTP_OPTS+=" -Dzeppelin.log.file='${ZEPPELIN_LOGFILE}'"
+JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
- $(mkdir -p "${ZEPPELIN_LOG_DIR}")
+ mkdir -p "${ZEPPELIN_LOG_DIR}"
fi
# set spark related env variables
@@ -152,16 +152,15 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
# run kinit
if [[ -n "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" ]] && [[ -n "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}" ]]; then
- kinit -kt ${ZEPPELIN_SERVER_KERBEROS_KEYTAB} ${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}
+ kinit -kt "${ZEPPELIN_SERVER_KERBEROS_KEYTAB}" "${ZEPPELIN_SERVER_KERBEROS_PRINCIPAL}"
fi
if [[ -n "${SPARK_HOME}" ]]; then
export SPARK_SUBMIT="${SPARK_HOME}/bin/spark-submit"
- SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/spark-interpreter*.jar)"
+ SPARK_APP_JAR="$(ls "${ZEPPELIN_HOME}"/interpreter/spark/spark-interpreter*.jar)"
# This will evantually passes SPARK_APP_JAR to classpath of SparkIMain
ZEPPELIN_INTP_CLASSPATH+=":${SPARK_APP_JAR}"
- pattern="$SPARK_HOME/python/lib/py4j-*-src.zip"
- py4j=($pattern)
+ py4j=("${SPARK_HOME}"/python/lib/py4j-*-src.zip)
# pick the first match py4j zip - there should only be one
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="${py4j[0]}:$PYTHONPATH"
@@ -178,8 +177,7 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
addJarInDirForIntp "${INTERPRETER_DIR}/dep"
- pattern="${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-*-src.zip"
- py4j=($pattern)
+ py4j=("${ZEPPELIN_HOME}"/interpreter/spark/pyspark/py4j-*-src.zip)
# pick the first match py4j zip - there should only be one
PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${py4j[0]}"
@@ -246,14 +244,14 @@ elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
# Don't use `hadoop classpath` if flink-hadoop-shaded in in lib folder
flink_hadoop_shaded_jar=$(find "${FLINK_HOME}/lib" -name 'flink-shaded-hadoop-*.jar')
- if [[ ! -z "$flink_hadoop_shaded_jar" ]]; then
+ if [[ -n "$flink_hadoop_shaded_jar" ]]; then
echo ""
else
if [[ ! ( -x "$(command -v hadoop)" ) && ( "${ZEPPELIN_INTERPRETER_LAUNCHER}" != "yarn" ) ]]; then
echo 'Error: hadoop is not in PATH when HADOOP_CONF_DIR is specified and no flink-shaded-hadoop jar '
exit 1
fi
- ZEPPELIN_INTP_CLASSPATH+=":`hadoop classpath`"
+ ZEPPELIN_INTP_CLASSPATH+=":$(hadoop classpath)"
fi
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
else
@@ -271,59 +269,26 @@ fi
addJarInDirForIntp "${LOCAL_INTERPRETER_REPO}"
-if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
+if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]]; then
if [[ "${INTERPRETER_ID}" != "spark" || "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" == "false" ]]; then
- suid="$(id -u ${ZEPPELIN_IMPERSONATE_USER})"
+ suid="$(id -u "${ZEPPELIN_IMPERSONATE_USER}")"
if [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
- INTERPRETER_RUN_COMMAND=${ZEPPELIN_IMPERSONATE_RUN_CMD}" '"
+ INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_IMPERSONATE_RUN_CMD[@]}")
if [[ -f "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh" ]]; then
- INTERPRETER_RUN_COMMAND+=" source "${ZEPPELIN_CONF_DIR}'/zeppelin-env.sh;'
+ INTERPRETER_RUN_COMMAND+=("source" "${ZEPPELIN_CONF_DIR}/zeppelin-env.sh;")
fi
fi
fi
fi
if [[ -n "${SPARK_SUBMIT}" ]]; then
- INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${CALLBACK_HOST} ${PORT} \"${INTP_GROUP_ID}\" ${INTP_PORT}`
-else
- INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" ${ZEPPELIN_SERVER} ${CALLBACK_HOST} ${PORT} \"${INTP_GROUP_ID}\" ${INTP_PORT}`
-fi
-
-
-if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUBMIT}" ]]; then
- INTERPRETER_RUN_COMMAND+="'"
-fi
-
-echo "Interpreter launch command: $INTERPRETER_RUN_COMMAND"
-eval $INTERPRETER_RUN_COMMAND &
-pid=$!
-
-if [[ -z "${pid}" ]]; then
- exit 1;
+ IFS=' ' read -r -a SPARK_SUBMIT_OPTIONS_ARRAY <<< "${SPARK_SUBMIT_OPTIONS}"
+ IFS=' ' read -r -a ZEPPELIN_SPARK_CONF_ARRAY <<< "${ZEPPELIN_SPARK_CONF}"
+ INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
- echo ${pid} > "${ZEPPELIN_PID}"
+ IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
+ IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}"
+ INTERPRETER_RUN_COMMAND+=("${ZEPPELIN_RUNNER}" "${JAVA_INTP_OPTS_ARRAY[@]}" "${ZEPPELIN_INTP_MEM_ARRAY[@]}" "-cp" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "${ZEPPELIN_SERVER}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
fi
-
-trap 'shutdown_hook;' SIGTERM SIGINT SIGQUIT
-function shutdown_hook() {
- local count
- count=0
- echo "trying to shutdown..."
- while [[ "${count}" -lt 10 ]]; do
- $(kill ${pid} > /dev/null 2> /dev/null)
- if kill -0 ${pid} > /dev/null 2>&1; then
- sleep 3
- let "count+=1"
- else
- break
- fi
- if [[ "${count}" == "5" ]]; then
- $(kill -9 ${pid} > /dev/null 2> /dev/null)
- fi
- done
-}
-
-wait
-
-rm -f "${ZEPPELIN_PID}" > /dev/null 2> /dev/null
+exec "${INTERPRETER_RUN_COMMAND[@]}"
diff --git a/k8s/interpreter/100-interpreter-spec.yaml b/k8s/interpreter/100-interpreter-spec.yaml
index 76f1dea..116b0df 100644
--- a/k8s/interpreter/100-interpreter-spec.yaml
+++ b/k8s/interpreter/100-interpreter-spec.yaml
@@ -43,12 +43,22 @@ spec:
containers:
- name: {{zeppelin.k8s.interpreter.container.name}}
image: {{zeppelin.k8s.interpreter.container.image}}
- command: ["sh", "-c", "$(ZEPPELIN_HOME)/bin/interpreter.sh -d $(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}} -r {{zeppelin.k8s.interpreter.rpc.portRange}} -c {{zeppelin.k8s.server.rpc.service}} -p {{zeppelin.k8s.server.rpc.portRange}} -i {{zeppelin.k8s.interpreter.group.id}} -l {{zeppelin.k8s.interpreter.localRepo}} -g {{zeppelin.k8s.interpreter.setting.name}}"]
- lifecycle:
- preStop:
- exec:
- # SIGTERM triggers a quick exit; gracefully terminate instead
- command: ["sh", "-c", "ps -ef | grep org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer | grep -v grep | awk '{print $2}' | xargs kill"]
+ args:
+ - "$(ZEPPELIN_HOME)/bin/interpreter.sh"
+ - "-d"
+ - "$(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}}"
+ - "-r"
+ - "{{zeppelin.k8s.interpreter.rpc.portRange}}"
+ - "-c"
+ - "{{zeppelin.k8s.server.rpc.service}}"
+ - "-p"
+ - "{{zeppelin.k8s.server.rpc.portRange}}"
+ - "-i"
+ - "{{zeppelin.k8s.interpreter.group.id}}"
+ - "-l"
+ - "{{zeppelin.k8s.interpreter.localRepo}}/{{zeppelin.k8s.interpreter.setting.name}}"
+ - "-g"
+ - "{{zeppelin.k8s.interpreter.setting.name}}"
env:
{% for key, value in zeppelin.k8s.envs.items() %}
- name: {{key}}
diff --git a/scripts/docker/zeppelin-interpreter/Dockerfile b/scripts/docker/zeppelin-interpreter/Dockerfile
index 090d1cc..0c9d956 100644
--- a/scripts/docker/zeppelin-interpreter/Dockerfile
+++ b/scripts/docker/zeppelin-interpreter/Dockerfile
@@ -26,7 +26,7 @@ ENV VERSION="${version}" \
RUN set -ex && \
apt-get -y update && \
- DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-8-jre-headless wget && \
+ DEBIAN_FRONTEND=noninteractive apt-get install -y openjdk-8-jre-headless wget tini && \
# Cleanup
rm -rf /var/lib/apt/lists/* && \
apt-get autoclean && \
@@ -80,4 +80,5 @@ RUN mkdir -p "${Z_HOME}/logs" "${Z_HOME}/run" "${Z_HOME}/local-repo" && \
chmod -R 775 "${Z_HOME}/logs" "${Z_HOME}/run" "${Z_HOME}/local-repo"
USER 1000
+ENTRYPOINT [ "/usr/bin/tini", "--" ]
WORKDIR ${Z_HOME}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index 2bf9adf..453a7e3 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.cluster.ClusterManagerClient;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@@ -73,9 +74,6 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -109,6 +107,8 @@ public class RemoteInterpreterServer extends Thread
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterServer.class);
+ private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000;
+
private String interpreterGroupId;
private InterpreterGroup interpreterGroup;
private AngularObjectRegistry angularObjectRegistry;
@@ -130,10 +130,6 @@ public class RemoteInterpreterServer extends Thread
private final Map<String, RunningApplication> runningApplications =
Collections.synchronizedMap(new HashMap<String, RunningApplication>());
- private Map<String, Object> remoteWorksResponsePool;
-
- private static final long DEFAULT_SHUTDOWN_TIMEOUT = 2000;
-
// Hold information for manual progress update
private ConcurrentMap<String, Integer> progressMap = new ConcurrentHashMap<>();
@@ -150,6 +146,8 @@ public class RemoteInterpreterServer extends Thread
// cluster manager client
private ClusterManagerClient clusterManagerClient;
+ private static Thread shutdownThread;
+
public RemoteInterpreterServer(String intpEventServerHost,
int intpEventServerPort,
String interpreterGroupId,
@@ -166,6 +164,8 @@ public class RemoteInterpreterServer extends Thread
if (null != intpEventServerHost) {
this.intpEventServerHost = intpEventServerHost;
this.intpEventServerPort = intpEventServerPort;
+ this.port = RemoteInterpreterUtils.findAvailablePort(portRange);
+ this.host = RemoteInterpreterUtils.findAvailableHostAddress();
if (!isTest) {
LOGGER.info("Starting remote interpreter server on port {}, intpEventServerAddress: {}:{}", port,
intpEventServerHost, intpEventServerPort);
@@ -177,69 +177,30 @@ public class RemoteInterpreterServer extends Thread
}
this.isTest = isTest;
this.interpreterGroupId = interpreterGroupId;
- RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
- new RemoteInterpreterService.Processor<>(this);
- TServerSocket serverTransport;
- if (null == intpEventServerHost) {
- // Dev Interpreter
- serverTransport = new TServerSocket(intpEventServerPort);
- } else {
- serverTransport = RemoteInterpreterUtils.createTServerSocket(portRange);
- this.port = serverTransport.getServerSocket().getLocalPort();
- this.host = RemoteInterpreterUtils.findAvailableHostAddress();
- LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
- }
- server = new TThreadPoolServer(
- new TThreadPoolServer.Args(serverTransport).processor(processor));
- remoteWorksResponsePool = Collections.synchronizedMap(new HashMap<String, Object>());
-
}
@Override
public void run() {
- if (null != intpEventServerHost && !isTest) {
- new Thread(new Runnable() {
- boolean interrupted = false;
-
- @Override
- public void run() {
- while (!interrupted && !server.isServing()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- if (!interrupted) {
- RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
- try {
- LOGGER.info("Registering interpreter process");
- intpEventClient.registerInterpreterProcess(registerInfo);
- LOGGER.info("Registered interpreter process");
- } catch (Exception e) {
- LOGGER.error("Error while registering interpreter: {}, cause: {}", registerInfo, e);
- try {
- shutdown();
- } catch (TException e1) {
- LOGGER.warn("Exception occurs while shutting down", e1);
- }
- }
- }
-
- if (launcherEnv != null && "yarn".endsWith(launcherEnv)) {
- try {
- YarnUtils.register(host, port);
- ScheduledExecutorService yarnHeartbeat = ExecutorFactory.singleton()
- .createOrGetScheduled("RM-Heartbeat", 1);
- yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, TimeUnit.MINUTES);
- } catch (Exception e) {
- LOGGER.error("Fail to register yarn app", e);
- }
- }
- }
- }).start();
+ RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
+ new RemoteInterpreterService.Processor<>(this);
+ try (TServerSocket tSocket = new TServerSocket(port)){
+ server = new TThreadPoolServer(
+ new TThreadPoolServer.Args(tSocket)
+ .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT)
+ .stopTimeoutUnit(TimeUnit.MILLISECONDS)
+ .processor(processor));
+
+ if (null != intpEventServerHost && !isTest) {
+ Thread registerThread = new Thread(new RegisterRunnable());
+ registerThread.setName("RegisterThread");
+ registerThread.start();
+ }
+ LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
+ server.serve();
+ } catch (TTransportException e) {
+ LOGGER.error("Failure in TTransport", e);
}
- server.serve();
+ LOGGER.info("RemoteInterpreterServer-Thread finished");
}
@Override
@@ -270,7 +231,6 @@ public class RemoteInterpreterServer extends Thread
@Override
public void shutdown() throws TException {
-
// unRegisterInterpreterProcess should be a sync operation (outside of shutdown thread),
// otherwise it would cause data mismatch between zeppelin server & interpreter process.
// e.g. zeppelin server start a new interpreter process, while previous interpreter process
@@ -283,62 +243,16 @@ public class RemoteInterpreterServer extends Thread
LOGGER.error("Fail to unregister remote interpreter process", e);
}
}
-
- Thread shutDownThread = new Thread(() -> {
- LOGGER.info("Shutting down...");
- // delete interpreter cluster meta
- deleteClusterMeta();
-
- if (interpreterGroup != null) {
- synchronized (interpreterGroup) {
- for (List<Interpreter> session : interpreterGroup.values()) {
- for (Interpreter interpreter : session) {
- try {
- interpreter.close();
- } catch (InterpreterException e) {
- LOGGER.warn("Fail to close interpreter", e);
- }
- }
- }
- }
+ if (shutdownThread != null) {
+ // no need to call shutdownhook twice
+ if (Runtime.getRuntime().removeShutdownHook(shutdownThread)) {
+ LOGGER.debug("ShutdownHook removed, because of a regular shutdown");
+ } else {
+ LOGGER.warn("The ShutdownHook could not be removed");
}
- if (!isTest) {
- SchedulerFactory.singleton().destroy();
- ExecutorFactory.singleton().shutdownAll();
- }
-
- if ("yarn".equals(launcherEnv)) {
- try {
- YarnUtils.unregister(true, "");
- } catch (Exception e) {
- LOGGER.error("Fail to unregister yarn app", e);
- }
- }
-
- server.stop();
-
- // server.stop() does not always finish server.serve() loop
- // sometimes server.serve() is hanging even after server.stop() call.
- // this case, need to force kill the process
-
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT &&
- server.isServing()) {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e);
- }
- }
-
- if (server.isServing()) {
- LOGGER.info("Force shutting down");
- System.exit(0);
- }
-
- LOGGER.info("Shutting down");
- }, "Shutdown-Thread");
+ }
+ Thread shutDownThread = new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_CALL);
shutDownThread.start();
}
@@ -387,21 +301,21 @@ public class RemoteInterpreterServer extends Thread
new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
remoteInterpreterServer.start();
- // add signal handler
- Signal.handle(new Signal("TERM"), new SignalHandler() {
- @Override
- public void handle(Signal signal) {
- try {
- LOGGER.info("Receive TERM Signal");
- remoteInterpreterServer.shutdown();
- } catch (TException e) {
- LOGGER.error("Error on shutdown RemoteInterpreterServer", e);
- }
- }
- });
+ /*
+ * Registration of a ShutdownHook in case of an unpredictable system call
+ * Examples: STRG+C, SIGTERM via kill
+ */
+ shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);
+ Runtime.getRuntime().addShutdownHook(shutdownThread);
remoteInterpreterServer.join();
LOGGER.info("RemoteInterpreterServer thread is finished");
+
+ /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.
+ * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd
+ * should be part of the next release and solve the problem.
+ * We may have other threads that are not terminated successfully.
+ */
System.exit(0);
}
@@ -425,20 +339,6 @@ public class RemoteInterpreterServer extends Thread
clusterManagerClient.putClusterMeta(INTP_PROCESS_META, interpreterGroupId, meta);
}
- private void deleteClusterMeta() {
- if (!zConf.isClusterMode()){
- return;
- }
-
- try {
- // delete interpreter cluster meta
- clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId);
- Thread.sleep(300);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
@Override
public void createInterpreter(String interpreterGroupId, String sessionId, String
className, Map<String, String> properties, String userName) throws TException {
@@ -683,6 +583,144 @@ public class RemoteInterpreterServer extends Thread
context.getNoteGui());
}
+ class RegisterRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ LOGGER.info("Start registration");
+ // wait till the server is serving
+ while (!Thread.currentThread().isInterrupted() && server != null && !server.isServing()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.info("InterruptedException received", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (!Thread.currentThread().isInterrupted()) {
+ RegisterInfo registerInfo = new RegisterInfo(host, port, interpreterGroupId);
+ try {
+ LOGGER.info("Registering interpreter process");
+ intpEventClient.registerInterpreterProcess(registerInfo);
+ LOGGER.info("Registered interpreter process");
+ } catch (Exception e) {
+ LOGGER.error("Error while registering interpreter: {}, cause: {}", registerInfo, e);
+ try {
+ shutdown();
+ } catch (TException e1) {
+ LOGGER.warn("Exception occurs while shutting down", e1);
+ }
+ }
+ }
+
+ if (launcherEnv != null && "yarn".endsWith(launcherEnv)) {
+ try {
+ YarnUtils.register(host, port);
+ ScheduledExecutorService yarnHeartbeat = ExecutorFactory.singleton()
+ .createOrGetScheduled("RM-Heartbeat", 1);
+ yarnHeartbeat.scheduleAtFixedRate(YarnUtils::heartbeat, 0, 1, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOGGER.error("Fail to register yarn app", e);
+ }
+ }
+ LOGGER.info("Registration finished");
+ }
+ }
+
+ class ShutdownThread extends Thread {
+ private final String cause;
+
+ public static final String CAUSE_SHUTDOWN_HOOK = "ShutdownHook";
+ public static final String CAUSE_SHUTDOWN_CALL = "ShutdownCall";
+
+ public ShutdownThread(String cause) {
+ super("ShutdownThread");
+ this.cause = cause;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("Shutting down...");
+ LOGGER.info("Shutdown initialized by {}", cause);
+ // delete interpreter cluster meta
+ deleteClusterMeta();
+
+ if (interpreterGroup != null) {
+ synchronized (interpreterGroup) {
+ for (List<Interpreter> session : interpreterGroup.values()) {
+ for (Interpreter interpreter : session) {
+ try {
+ interpreter.close();
+ } catch (InterpreterException e) {
+ LOGGER.warn("Fail to close interpreter", e);
+ }
+ }
+ }
+ }
+ }
+ if (!isTest) {
+ SchedulerFactory.singleton().destroy();
+ ExecutorFactory.singleton().shutdownAll();
+ }
+
+ if ("yarn".equals(launcherEnv)) {
+ try {
+ YarnUtils.unregister(true, "");
+ } catch (Exception e) {
+ LOGGER.error("Fail to unregister yarn app", e);
+ }
+ }
+ // Try to unregister the interpreter process in case the interpreter process exit unpredictable via ShutdownHook
+ if (intpEventClient != null && CAUSE_SHUTDOWN_HOOK.equals(cause)) {
+ try {
+ LOGGER.info("Unregister interpreter process");
+ intpEventClient.unRegisterInterpreterProcess();
+ } catch (Exception e) {
+ LOGGER.error("Fail to unregister remote interpreter process", e);
+ }
+ }
+
+ server.stop();
+
+ // server.stop() does not always finish server.serve() loop
+ // sometimes server.serve() is hanging even after server.stop() call.
+ // this case, need to force kill the process
+
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < (DEFAULT_SHUTDOWN_TIMEOUT + 100) &&
+ server.isServing()) {
+ try {
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ LOGGER.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (server.isServing()) {
+ LOGGER.info("Force shutting down");
+ System.exit(1);
+ }
+
+ LOGGER.info("Shutting down");
+ }
+
+ private void deleteClusterMeta() {
+ if (zConf == null || !zConf.isClusterMode()){
+ return;
+ }
+
+ try {
+ // delete interpreter cluster meta
+ clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, interpreterGroupId);
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
class InterpretJobListener implements JobListener {
@Override
@@ -1385,6 +1423,4 @@ public class RemoteInterpreterServer extends Thread
this.paragraphId = paragraphId;
}
}
-
- ;
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 5a0a3ab..e30b325 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -18,8 +18,6 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.commons.lang3.StringUtils;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,31 +44,18 @@ public class RemoteInterpreterUtils {
}
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
- int port;
- try (ServerSocket socket = new ServerSocket(0);) {
- port = socket.getLocalPort();
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
}
- return port;
}
- /**
- * start:end
- *
- * @param portRange
- * @return
- * @throws IOException
- */
- public static TServerSocket createTServerSocket(String portRange)
- throws IOException {
-
- TServerSocket tSocket = null;
+ public static int findAvailablePort(String portRange) throws IOException {
// ':' is the default value which means no constraints on the portRange
if (StringUtils.isBlank(portRange) || portRange.equals(":")) {
- try {
- tSocket = new TServerSocket(0);
- return tSocket;
- } catch (TTransportException e) {
- throw new IOException("Fail to create TServerSocket", e);
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ } catch (IOException e) {
+ throw new IOException("Failed to allocate a automatic port", e);
}
}
// valid user registered port https://en.wikipedia.org/wiki/Registered_port
@@ -84,10 +69,9 @@ public class RemoteInterpreterUtils {
end = Integer.parseInt(ports[1]);
}
for (int i = start; i <= end; ++i) {
- try {
- tSocket = new TServerSocket(i);
- return tSocket;
- } catch (Exception e) {
+ try (ServerSocket socket = new ServerSocket(i)) {
+ return socket.getLocalPort();
+ } catch (IOException e) {
// ignore this
}
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java
index 40d3dec..655bb4f 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/YarnUtils.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
*/
public class YarnUtils {
- private static Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtils.class);
private static AMRMClient<ContainerRequest> amClient = AMRMClient.createAMRMClient();
private static Configuration conf = new YarnConfiguration();
@@ -46,7 +46,7 @@ public class YarnUtils {
}
public static void register(String host, int port) throws Exception {
- LOGGER.info("Registering yarn app at " + host + ":" + port);
+ LOGGER.info("Registering yarn app at {}:{}", host, port);
try {
amClient.registerApplicationMaster(host, port, null);
} catch (YarnException e) {
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
index 9f54e2c..abe6d0a 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java
@@ -96,7 +96,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
LOGGER.info("Process is launched: {}", commandLine);
} catch (IOException e) {
this.processOutput.stopCatchLaunchOutput();
- LOGGER.error("Fail to launch process: " + commandLine, e);
+ LOGGER.error("Fail to launch process: {}", commandLine, e);
transition(State.TERMINATED);
errorMessage = e.getMessage();
}
@@ -106,7 +106,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
public void transition(State state) {
this.state = state;
- LOGGER.info("Process state is transitioned to " + state);
+ LOGGER.info("Process state is transitioned to {}", state);
}
public void onTimeout() {
@@ -121,7 +121,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
@Override
public void onProcessComplete(int exitValue) {
- LOGGER.warn("Process is exited with exit value " + exitValue);
+ LOGGER.warn("Process is exited with exit value {}", exitValue);
if (exitValue == 0) {
transition(State.COMPLETED);
} else {
@@ -131,7 +131,8 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
@Override
public void onProcessFailed(ExecuteException e) {
- LOGGER.warn("Process is failed due to " + e);
+ LOGGER.warn("Process with cmd {} is failed due to", commandLine, e);
+
errorMessage = ExceptionUtils.getStackTrace(e);
transition(State.TERMINATED);
}
@@ -187,7 +188,7 @@ public abstract class ProcessLauncher implements ExecuteResultHandler {
if (s.startsWith("Interpreter launch command")) {
LOGGER.info(s);
} else {
- LOGGER.debug("Process Output: " + s);
+ LOGGER.debug("Process Output: {}", s);
}
if (catchLaunchOutput) {
launchOutput.append(s + "\n");
diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
index 68981da..1d6fb52 100644
--- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
+++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtilsTest.java
@@ -26,23 +26,18 @@ import static org.junit.Assert.assertTrue;
public class RemoteInterpreterUtilsTest {
@Test
- public void testCreateTServerSocket() throws IOException {
- assertTrue(RemoteInterpreterUtils.createTServerSocket(":")
- .getServerSocket().getLocalPort() > 0);
+ public void testfindAvailablePort() throws IOException {
+ assertTrue(RemoteInterpreterUtils.findAvailablePort(":") > 0);
String portRange = ":30000";
- assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange)
- .getServerSocket().getLocalPort() <= 30000);
+ assertTrue(RemoteInterpreterUtils.findAvailablePort(portRange) <= 30000);
portRange = "30000:";
- assertTrue(RemoteInterpreterUtils.createTServerSocket(portRange)
- .getServerSocket().getLocalPort() >= 30000);
+ assertTrue(RemoteInterpreterUtils.findAvailablePort(portRange) >= 30000);
portRange = "30000:40000";
- int port = RemoteInterpreterUtils.createTServerSocket(portRange)
- .getServerSocket().getLocalPort();
+ int port = RemoteInterpreterUtils.findAvailablePort(portRange);
assertTrue(port >= 30000 && port <= 40000);
}
-
}
diff --git a/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml b/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml
new file mode 100644
index 0000000..0078a20
--- /dev/null
+++ b/zeppelin-plugins/notebookrepo/s3/src/test/resources/logback.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <root level="INFO">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
index aacccc8..b0be31a 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java
@@ -22,6 +22,7 @@ import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.helium.ApplicationEventListener;
@@ -97,21 +98,19 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi
Thread startingThread = new Thread() {
@Override
public void run() {
- TServerSocket tSocket = null;
- try {
- tSocket = RemoteInterpreterUtils.createTServerSocket(portRange);
+ try (TServerSocket tSocket = new TServerSocket(RemoteInterpreterUtils.findAvailablePort(portRange))){
port = tSocket.getServerSocket().getLocalPort();
host = RemoteInterpreterUtils.findAvailableHostAddress();
- } catch (IOException e1) {
- throw new RuntimeException(e1);
+ LOGGER.info("InterpreterEventServer is starting at {}:{}", host, port);
+ RemoteInterpreterEventService.Processor<RemoteInterpreterEventServer> processor =
+ new RemoteInterpreterEventService.Processor<>(RemoteInterpreterEventServer.this);
+ thriftServer = new TThreadPoolServer(
+ new TThreadPoolServer.Args(tSocket).processor(processor));
+ thriftServer.serve();
+ } catch (IOException | TTransportException e ) {
+ throw new RuntimeException("Fail to create TServerSocket", e);
}
-
- LOGGER.info("InterpreterEventServer is starting at {}:{}", host, port);
- RemoteInterpreterEventService.Processor processor =
- new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this);
- thriftServer = new TThreadPoolServer(
- new TThreadPoolServer.Args(tSocket).processor(processor));
- thriftServer.serve();
+ LOGGER.info("ThriftServer-Thread finished");
}
};
startingThread.start();
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index 7de01db..dba9b03 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -144,7 +144,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
&& jar.toFile().getName().endsWith(".jar"))
.map(jar -> jar.toAbsolutePath().toString())
.collect(Collectors.toList());
- if (interpreterJars.size() == 0) {
+ if (interpreterJars.isEmpty()) {
throw new IOException("zeppelin-interpreter-shaded jar is not found");
} else if (interpreterJars.size() > 1) {
throw new IOException("more than 1 zeppelin-interpreter-shaded jars are found: "
@@ -171,7 +171,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
sparkConfBuilder.append(" --proxy-user " + context.getUserName());
}
- env.put("ZEPPELIN_SPARK_CONF", escapeSpecialCharacter(sparkConfBuilder.toString()));
+ env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
// set these env in the order of
// 1. interpreter-setting
@@ -215,10 +215,10 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
env.put("ZEPPELIN_INTP_CLASSPATH", driverExtraClassPath);
}
} else {
- LOGGER.warn("spark-defaults.conf doesn't exist: " + sparkDefaultFile.getAbsolutePath());
+ LOGGER.warn("spark-defaults.conf doesn't exist: {}", sparkDefaultFile.getAbsolutePath());
}
- LOGGER.debug("buildEnvFromProperties: " + env);
+ LOGGER.debug("buildEnvFromProperties: {}", env);
return env;
}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index 4a7189b..5195710 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -107,8 +107,8 @@ public class SparkInterpreterLauncherTest {
assertTrue(interpreterProcess.getEnv().size() >= 2);
assertEquals(sparkHome, interpreterProcess.getEnv().get("SPARK_HOME"));
assertFalse(interpreterProcess.getEnv().containsKey("ENV_1"));
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.files=file_1" +
- " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]"),
+ assertEquals(" --conf spark.files=file_1" +
+ " --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -138,9 +138,9 @@ public class SparkInterpreterLauncherTest {
String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip +
+ assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
" --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
- " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client"),
+ " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -171,10 +171,10 @@ public class SparkInterpreterLauncherTest {
String sparkJars = "jar_1";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1";
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip +
+ assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
" --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
" --conf spark.submit.deployMode=client" +
- " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn"),
+ " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -207,13 +207,13 @@ public class SparkInterpreterLauncherTest {
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip +
+ assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
" --conf spark.yarn.maxAppAttempts=1" +
" --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
" --conf spark.yarn.isPython=true" +
" --conf spark.yarn.submit.waitAppCompletion=false" +
" --conf spark.app.name=intpGroupId" +
- " --conf spark.master=yarn-cluster"),
+ " --conf spark.master=yarn-cluster",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@@ -253,14 +253,14 @@ public class SparkInterpreterLauncherTest {
zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar";
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
String sparkFiles = "file_1," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip +
+ assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
" --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId" +
" --conf spark.yarn.maxAppAttempts=1" +
" --conf spark.master=yarn" +
" --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
" --conf spark.submit.deployMode=cluster" +
" --conf spark.yarn.submit.waitAppCompletion=false" +
- " --proxy-user user1"),
+ " --proxy-user user1",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar"));
FileUtils.deleteDirectory(localRepoPath.toFile());
@@ -302,7 +302,7 @@ public class SparkInterpreterLauncherTest {
String sparkrZip = sparkHome + "/R/lib/sparkr.zip#sparkr";
// escape special characters
String sparkFiles = "{}," + zeppelinHome + "/conf/log4j_yarn_cluster.properties";
- assertEquals(InterpreterLauncher.escapeSpecialCharacter(" --conf spark.yarn.dist.archives=" + sparkrZip +
+ assertEquals(" --conf spark.yarn.dist.archives=" + sparkrZip +
" --conf spark.yarn.isPython=true" +
" --conf spark.app.name=intpGroupId" +
" --conf spark.yarn.maxAppAttempts=1" +
@@ -310,7 +310,7 @@ public class SparkInterpreterLauncherTest {
" --conf spark.files=" + sparkFiles + " --conf spark.jars=" + sparkJars +
" --conf spark.submit.deployMode=cluster" +
" --conf spark.yarn.submit.waitAppCompletion=false" +
- " --proxy-user user1"),
+ " --proxy-user user1",
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
FileUtils.deleteDirectory(localRepoPath.toFile());
}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
index 20056ec..9554dcf 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java
@@ -85,6 +85,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
private StatusChangedListener afterStatusChangedListener;
private QuartzSchedulerService schedulerService;
+ @Override
@Before
public void setUp() throws Exception {
System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_PUBLIC.getVarName(), "true");
@@ -105,6 +106,7 @@ public class NotebookTest extends AbstractInterpreterTest implements ParagraphJo
schedulerService.waitForFinishInit();
}
+ @Override
@After
public void tearDown() throws Exception {
super.tearDown();