You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/07/13 15:07:04 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4915] K8s with java
lib
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 8af054e [ZEPPELIN-4915] K8s with java lib
8af054e is described below
commit 8af054e851a5f3cae5238a311600279d630354ed
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Jul 8 18:21:37 2020 +0200
[ZEPPELIN-4915] K8s with java lib
### What is this PR for?
This PR changes from a Kubectl binary to a Java library for connecting to a Kubernetes cluster.
I decided to use [fabric8/kubernetes-client](https://github.com/fabric8io/kubernetes-client), because it's also used by the [Apache Spark project](https://github.com/fabric8io/kubernetes-client#who-uses-kubernetes--openshift-java-client) and allows the import of our generic files rendered with jinja2.
### What type of PR is it?
- Improvement
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4915
### How should this be tested?
* Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/706228986
### Questions:
* Does the licenses files need update? Yes
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #3825 from Reamer/k8s_with_java_lib and squashes the following commits:
2cb46a7c4 [Philipp Dallig] Describe how an Auth-Token can be provided for communication with K8s
c6a7cda78 [Philipp Dallig] Added the ability to use a custom namespace for development
1f50b3971 [Philipp Dallig] Correct rendering of template
495c90bb1 [Philipp Dallig] accept the proposals of the review
37acc5e25 [Philipp Dallig] Add license for kubernetes-client
a95bd1988 [Philipp Dallig] Some cleanup
827cb5df4 [Philipp Dallig] Rewrite thread handling for K8s Launcher
143a5b5ec [Philipp Dallig] Switch from kubectl binary to java library
ce9fce12a [Philipp Dallig] Update jinja to a new version
bdbcec83f [Philipp Dallig] Add LstripBlock and TrimBlocks in Jinja rendering
c4906f27b [Philipp Dallig] cleanup RemoteInterpreterUtils
(cherry picked from commit b70a1b81b4fe1bea81313721f3ab1abcab0d7929)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
LICENSE | 1 +
docs/quickstart/kubernetes.md | 3 +-
scripts/docker/zeppelin/bin/Dockerfile | 7 -
.../zeppelin/conf/ZeppelinConfiguration.java | 6 +-
.../interpreter/launcher/InterpreterLauncher.java | 19 +-
.../interpreter/remote/RemoteInterpreterUtils.java | 28 +--
zeppelin-plugins/launcher/k8s-standard/pom.xml | 77 ++++++++-
.../launcher/K8sRemoteInterpreterProcess.java | 191 ++++++++++-----------
.../interpreter/launcher/K8sSpecTemplate.java | 11 +-
.../launcher/K8sStandardInterpreterLauncher.java | 33 ++--
.../zeppelin/interpreter/launcher/Kubectl.java | 155 -----------------
.../launcher/K8sRemoteInterpreterProcessTest.java | 91 ++++------
.../interpreter/launcher/K8sSpecTemplateTest.java | 15 ++
.../K8sStandardInterpreterLauncherTest.java | 43 ++---
.../zeppelin/interpreter/launcher/KubectlTest.java | 105 -----------
15 files changed, 274 insertions(+), 511 deletions(-)
diff --git a/LICENSE b/LICENSE
index d6460e9..5bd3e1a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -267,6 +267,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
(Apache 2.0) Embedded MongoDB (https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo)
(Apache 2.0) Kotlin (https://github.com/JetBrains/kotlin)
(Apache 2.0) s3proxy (https://github.com/gaul/s3proxy)
+ (Apache 2.0) kubernetes-client (https://github.com/fabric8io/kubernetes-client)
========================================================================
BSD 3-Clause licenses
diff --git a/docs/quickstart/kubernetes.md b/docs/quickstart/kubernetes.md
index cd45912..2cfbbca 100644
--- a/docs/quickstart/kubernetes.md
+++ b/docs/quickstart/kubernetes.md
@@ -264,5 +264,6 @@ Zeppelin can run locally (such as inside your IDE in debug mode) and able to run
| ZEPPELIN_K8S_PORTFORWARD | true | Enable port forwarding from local Zeppelin instance to Interpreters running on Kubernetes |
| ZEPPELIN_K8S_CONTAINER_IMAGE | <image>:<version> | Zeppelin interpreter docker image to use |
| ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE | <image>:<version> | Spark docker image to use |
+| ZEPPELIN_K8S_NAMESPACE | <k8s namespace> | Kubernetes namespace to use |
+| KUBERNETES_AUTH_TOKEN | <token> | Kubernetes auth token to create resources |
-`kubectl` command need to be configured to connect your Kubernetes cluster.
diff --git a/scripts/docker/zeppelin/bin/Dockerfile b/scripts/docker/zeppelin/bin/Dockerfile
index 6745b46..e1bcba8 100644
--- a/scripts/docker/zeppelin/bin/Dockerfile
+++ b/scripts/docker/zeppelin/bin/Dockerfile
@@ -97,13 +97,6 @@ RUN echo "$LOG_TAG Install R related packages" && \
R -e "install.packages('Rcpp', repos='http://cran.us.r-project.org')" && \
Rscript -e "library('devtools'); library('Rcpp'); install_github('ramnathv/rCharts')"
-# Install kubectl
-RUN apt-get install -y apt-transport-https && \
- curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
- echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list && \
- apt-get update && \
- apt-get install -y kubectl
-
RUN echo "$LOG_TAG Cleanup" && \
apt-get autoclean && \
apt-get clean
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 7331a50..112bdbf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -828,8 +828,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD);
}
- public String getK8sKubectlCmd() {
- return getString(ConfVars.ZEPPELIN_K8S_KUBECTL);
+ public String getK8sNamepsace() {
+ return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
}
public String getK8sContainerImage() {
@@ -1032,8 +1032,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"), // auto | local | k8s | Docker
ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes
- ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"), // kubectl command
ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()),
+ ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"), // specify a namespace incase of Zeppelin is running outside of kuberentes
ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", "apache/spark:latest"),
ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 1fb2ea9..0d90fc4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -17,23 +17,21 @@
package org.apache.zeppelin.interpreter.launcher;
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Properties;
-
/**
* Component to Launch interpreter process.
*/
public abstract class InterpreterLauncher {
- private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
- private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";
+ private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
+ private static final String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";
protected ZeppelinConfiguration zConf;
protected Properties properties;
@@ -89,14 +87,13 @@ public abstract class InterpreterLauncher {
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
if (recoveredClient != null) {
if (recoveredClient.isRunning()) {
- LOGGER.info("Recover interpreter process running at {} of interpreter group: {}",
- recoveredClient.getHost() + ":" + recoveredClient.getPort(),
+ LOGGER.info("Recover interpreter process running at {}:{} of interpreter group: {}",
+ recoveredClient.getHost(), recoveredClient.getPort(),
recoveredClient.getInterpreterGroupId());
return recoveredClient;
} else {
recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId());
- LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":"
- + recoveredClient.getPort() + ", as it is already terminated.");
+ LOGGER.warn("Unable to recover interpreter process: {}:{}, as it is already terminated.", recoveredClient.getHost(), recoveredClient.getPort());
}
}
}
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4f7c9a5..5a0a3ab 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -40,14 +39,16 @@ import java.util.Collections;
*
*/
public class RemoteInterpreterUtils {
- static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
+ private RemoteInterpreterUtils() {
+ throw new IllegalStateException("Utility class");
+ }
public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
- socket.close();
}
return port;
}
@@ -117,25 +118,14 @@ public class RemoteInterpreterUtils {
}
public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
- try {
- Socket discover = new Socket();
+ try (Socket discover = new Socket()) {
discover.setSoTimeout(1000);
discover.connect(new InetSocketAddress(host, port), 1000);
- discover.close();
return true;
- } catch (ConnectException cne) {
+ } catch (IOException e) {
// end point is not accessible
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
- "(might be initializing): " + cne.getMessage());
- }
- return false;
- } catch (IOException ioe) {
- // end point is not accessible
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
- "(might be initializing): " + ioe.getMessage());
- }
+ LOGGER.debug("Remote endpoint '{}:{}' is not accessible " +
+ "(might be initializing): {}" , host, port, e.getMessage());
return false;
}
}
@@ -143,7 +133,7 @@ public class RemoteInterpreterUtils {
public static String getInterpreterSettingId(String intpGrpId) {
String settingId = null;
if (intpGrpId != null) {
- int indexOfColon = intpGrpId.indexOf("-");
+ int indexOfColon = intpGrpId.indexOf('-');
settingId = intpGrpId.substring(0, indexOfColon);
}
return settingId;
diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml
index a5a385c..077408e 100644
--- a/zeppelin-plugins/launcher/k8s-standard/pom.xml
+++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml
@@ -37,22 +37,50 @@
<properties>
<plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name>
+ <kubernetes.client.version>4.10.2</kubernetes.client.version>
+ <jinjava.version>2.5.4</jinjava.version>
</properties>
<dependencies>
<dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${kubernetes.client.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.hubspot.jinjava</groupId>
<artifactId>jinjava</artifactId>
- <version>2.4.12</version>
+ <version>${jinjava.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Test libraries -->
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <version>${kubernetes.client.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
- <artifactId>maven-dependency-plugin</artifactId>
- </plugin>
- <plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
@@ -61,6 +89,47 @@
</execution>
</executions>
</plugin>
+ <!-- Shade the whole plugin, because spark provides an other version of okio -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>okio</pattern>
+ <shadedPattern>org.apache.zeppelin.shaded.okio</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </plugin>
+ <!-- disable maven-dependency-plugin with execution copy-plugin-dependencies because we shade the whole dependency -->
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-plugin-dependencies</id>
+ <phase>none</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 5735c31..5df2f02 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -1,17 +1,16 @@
package org.apache.zeppelin.interpreter.launcher;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
-import java.util.*;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.hubspot.jinjava.Jinjava;
-import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -19,10 +18,23 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.hubspot.jinjava.Jinjava;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.LocalPortForward;
+import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable;
+
public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
- private final Kubectl kubectl;
+ private final KubernetesClient client;
+ private final String namespace;
private final String interpreterGroupId;
private final String interpreterGroupName;
private final String interpreterSettingName;
@@ -31,15 +43,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private final Properties properties;
private final Map<String, String> envs;
- private final Gson gson = new Gson();
private final String podName;
private final boolean portForward;
private final String sparkImage;
- private ExecuteWatchdog portForwardWatchdog;
+ private LocalPortForward localPortForward;
private int podPort = K8S_INTERPRETER_SERVICE_PORT;
private final boolean isUserImpersonatedForSpark;
- private String userName;
private AtomicBoolean started = new AtomicBoolean(false);
private Random rand = new Random();
@@ -50,7 +60,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";
public K8sRemoteInterpreterProcess(
- Kubectl kubectl,
+ KubernetesClient client,
+ String namespace,
File specTemplates,
String containerImage,
String interpreterGroupId,
@@ -66,7 +77,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
boolean isUserImpersonatedForSpark
) {
super(connectTimeout, intpEventServerHost, intpEventServerPort);
- this.kubectl = kubectl;
+ this.client = client;
+ this.namespace = namespace;
this.specTempaltes = specTemplates;
this.containerImage = containerImage;
this.interpreterGroupId = interpreterGroupId;
@@ -85,11 +97,18 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
* Get interpreter pod name
* @return
*/
- @VisibleForTesting
- String getPodName() {
+ public String getPodName() {
return podName;
}
+ /**
+ * Get namespace
+ * @return
+ */
+ public String getNamespace() {
+ return namespace;
+ }
+
@Override
public String getInterpreterGroupId() {
return interpreterGroupId;
@@ -102,76 +121,71 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
@Override
public void start(String userName) throws IOException {
- /**
- * If a spark interpreter process is running, userName is set in preparation for --proxy-user
- */
- if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
- this.userName = userName;
- } else {
- this.userName = null;
- }
+
+ Properties templateProperties = getTemplateBindings(userName);
// create new pod
- apply(specTempaltes, false);
- kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000);
+ apply(specTempaltes, false, templateProperties);
if (portForward) {
podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- portForwardWatchdog = kubectl.portForward(
- String.format("pod/%s", getPodName()),
- new String[] {
- String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
- });
+ localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort);
}
long startTime = System.currentTimeMillis();
+ long timeoutTime = startTime + getConnectTimeout();
// wait until interpreter send started message through thrift rpc
synchronized (started) {
- if (!started.get()) {
+ while (!started.get()) {
+ long timetoTimeout = timeoutTime - System.currentTimeMillis();
+ if (timetoTimeout <= 0) {
+ stop();
+ throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
+ }
try {
- started.wait(getConnectTimeout());
+ started.wait(timetoTimeout);
} catch (InterruptedException e) {
- LOGGER.error("Remote interpreter is not accessible");
+ LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e);
+ stop();
+ Thread.currentThread().interrupt();
}
}
}
- if (!started.get()) {
- LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000);
- }
-
// waits for interpreter thrift rpc server ready
- while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
- break;
- } else {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
+ while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
+ if (System.currentTimeMillis() - timeoutTime > 0) {
+ stop();
+ throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e);
+ stop();
+ Thread.currentThread().interrupt();
}
}
}
@Override
public void stop() {
+ Properties templateProperties = getTemplateBindings(null);
// delete pod
try {
- apply(specTempaltes, true);
+ apply(specTempaltes, true, templateProperties);
} catch (IOException e) {
LOGGER.info("Error on removing interpreter pod", e);
}
-
- try {
- kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
- } catch (IOException e) {
- LOGGER.debug("Error on waiting pod delete", e);
- }
-
-
- if (portForwardWatchdog != null) {
- portForwardWatchdog.destroyProcess();
+ if (portForward) {
+ try {
+ localPortForward.close();
+ } catch (IOException e) {
+ LOGGER.info("Error on closing portforwarder", e);
+ }
}
+ // Shutdown connection
+ shutdown();
}
@Override
@@ -194,40 +208,24 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
return true;
}
-
- String ret = kubectl.execAndGet(new String[]{
- "get",
- String.format("pods/%s", getPodName()),
- "-o",
- "json"
- });
-
- if (ret == null) {
- return false;
- }
-
- Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType());
- if (pod == null || !pod.containsKey("status")) {
- return false;
- }
-
- Map<String, Object> status = (Map<String, Object>) pod.get("status");
- if (status == null || !status.containsKey("phase")) {
- return false;
+ Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+ if (pod != null) {
+ PodStatus status = pod.getStatus();
+ if (status != null) {
+ return "Running".equals(status.getPhase()) && started.get();
+ }
}
-
- return "Running".equals(status.get("phase")) && started.get();
} catch (Exception e) {
LOGGER.error("Can't get pod status", e);
- return false;
}
+ return false;
}
/**
* Apply spec file(s) in the path.
* @param path
*/
- void apply(File path, boolean delete) throws IOException {
+ void apply(File path, boolean delete, Properties templateProperties) throws IOException {
if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
LOGGER.info("Skip {}", path.getAbsolutePath());
}
@@ -240,18 +238,19 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
for (File f : files) {
- apply(f, delete);
+ apply(f, delete, templateProperties);
}
} else if (path.isFile()) {
- LOGGER.info("Apply {}", path.getAbsolutePath());
K8sSpecTemplate specTemplate = new K8sSpecTemplate();
- specTemplate.loadProperties(getTemplateBindings());
-
- String spec = specTemplate.render(path);
+ specTemplate.loadProperties(templateProperties);
+ String template = specTemplate.render(path);
+ ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
+ LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size());
+ LOGGER.debug(template);
if (delete) {
- kubectl.delete(spec);
+ k8sObjects.inNamespace(namespace).delete();
} else {
- kubectl.apply(spec);
+ k8sObjects.inNamespace(namespace).createOrReplace();
}
} else {
LOGGER.error("Can't apply {}", path.getAbsolutePath());
@@ -259,11 +258,11 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
@VisibleForTesting
- Properties getTemplateBindings() {
+ Properties getTemplateBindings(String userName) {
Properties k8sProperties = new Properties();
// k8s template properties
- k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace());
+ k8sProperties.put("zeppelin.k8s.namespace", getNamespace());
k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
@@ -287,7 +286,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
int webUiPort = 4040;
k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);
if (isSparkOnKubernetes(properties)) {
- envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions());
+ envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
}
envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
@@ -355,7 +354,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
@VisibleForTesting
- String buildSparkSubmitOptions() {
+ String buildSparkSubmitOptions(String userName) {
StringBuilder options = new StringBuilder();
options.append(" --master k8s://https://kubernetes.default.svc");
@@ -363,10 +362,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY));
}
- if (userName != null) {
+ if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
options.append(" --proxy-user " + userName);
}
- options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
+ options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
options.append(" --conf spark.executor.instances=1");
options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
@@ -381,7 +380,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
private String getInterpreterPodDnsName() {
return String.format("%s.%s.svc",
getPodName(), // service name and pod name is the same
- kubectl.getNamespace());
+ getNamespace());
}
/**
@@ -431,7 +430,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
LOGGER.info("Interpreter pod created {}:{}", host, port);
synchronized (started) {
started.set(true);
- started.notify();
+ started.notifyAll();
}
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
index 2ed2c13..5abfb48 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
@@ -16,9 +16,6 @@
*/
package org.apache.zeppelin.interpreter.launcher;
-import com.hubspot.jinjava.Jinjava;
-import org.apache.commons.io.FileUtils;
-
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
@@ -27,6 +24,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
+
+import com.hubspot.jinjava.Jinjava;
+import com.hubspot.jinjava.JinjavaConfig;
+
public class K8sSpecTemplate extends HashMap<String, Object> {
public String render(File templateFile) throws IOException {
String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset());
@@ -37,7 +39,8 @@ public class K8sSpecTemplate extends HashMap<String, Object> {
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- Jinjava jinja = new Jinjava();
+ JinjavaConfig config = JinjavaConfig.newBuilder().withLstripBlocks(true).withTrimBlocks(true).build();
+ Jinjava jinja = new Jinjava(config);
return jinja.render(template, this);
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index d1df0f3..799a91c 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -18,22 +18,24 @@
package org.apache.zeppelin.interpreter.launcher;
import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
/**
* Interpreter Launcher which use shell script to launch the interpreter process.
@@ -41,23 +43,14 @@ import java.util.Map;
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
- private final Kubectl kubectl;
private InterpreterLaunchContext context;
-
+ private final KubernetesClient client;
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
super(zConf, recoveryStorage);
- kubectl = new Kubectl(zConf.getK8sKubectlCmd());
- kubectl.setNamespace(getNamespace());
+ client = new DefaultKubernetesClient();
}
- @VisibleForTesting
- K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) {
- super(zConf, recoveryStorage);
- this.kubectl = kubectl;
- }
-
-
/**
* Check if i'm running inside of kubernetes or not.
* It should return truth regardless of ZeppelinConfiguration.getRunMode().
@@ -79,9 +72,9 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
*/
String getNamespace() throws IOException {
if (isRunningOnKubernetes()) {
- return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim();
+ return readFile(Config.KUBERNETES_NAMESPACE_PATH, Charset.defaultCharset()).trim();
} else {
- return "default";
+ return zConf.getK8sNamepsace();
}
}
@@ -143,10 +136,10 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
this.context = context;
this.properties = context.getProperties();
- int connectTimeout = getConnectTimeout();
return new K8sRemoteInterpreterProcess(
- kubectl,
+ client,
+ getNamespace(),
new File(zConf.getK8sTemplatesDir(), "interpreter"),
zConf.getK8sContainerImage(),
context.getInterpreterGroupId(),
@@ -158,7 +151,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
getZeppelinServiceRpcPort(),
zConf.getK8sPortForward(),
zConf.getK8sSparkContainerImage(),
- connectTimeout,
+ getConnectTimeout(),
isUserImpersonateForSparkInterpreter(context));
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
deleted file mode 100644
index a0ceb0a..0000000
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.io.IOUtils;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Kubectl {
- private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
- private final String kubectlCmd;
- private String namespace;
-
- public Kubectl(String kubectlCmd) {
- this.kubectlCmd = kubectlCmd;
- }
-
- /**
- * Override namespace. Otherwise use namespace provided in schema
- * @param namespace
- */
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
-
- public String getNamespace() {
- return namespace;
- }
-
- public String apply(String spec) throws IOException {
- return execAndGet(new String[]{"apply", "-f", "-"}, spec);
- }
-
- public String delete(String spec) throws IOException {
- return execAndGet(new String[]{"delete", "-f", "-"}, spec);
- }
-
- public String wait(String resource, String waitFor, int timeoutSec) throws IOException {
- try {
- return execAndGet(new String[]{
- "wait",
- resource,
- String.format("--for=%s", waitFor),
- String.format("--timeout=%ds", timeoutSec)});
- } catch (IOException e) {
- if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) {
- LOGGER.info("{} Not found. Maybe already deleted.", resource);
- return "";
- } else {
- throw e;
- }
- }
- }
-
- public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException {
- DefaultExecutor executor = new DefaultExecutor();
- CommandLine cmd = new CommandLine(kubectlCmd);
- cmd.addArguments("port-forward");
- cmd.addArguments(resource);
- cmd.addArguments(ports);
-
- ExecuteWatchdog watchdog = new ExecuteWatchdog(-1);
- executor.setWatchdog(watchdog);
-
- executor.execute(cmd, new ExecuteResultHandler() {
- @Override
- public void onProcessComplete(int i) {
- LOGGER.info("Port-forward stopped");
- }
-
- @Override
- public void onProcessFailed(ExecuteException e) {
- LOGGER.debug("port-forward process exit", e);
- }
- });
-
- return watchdog;
- }
-
- String execAndGet(String [] args) throws IOException {
- return execAndGet(args, "");
- }
-
- @VisibleForTesting
- String execAndGet(String [] args, String stdin) throws IOException {
- InputStream ins = IOUtils.toInputStream(stdin, StandardCharsets.UTF_8);
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args));
-
- // set namespace
- if (namespace != null) {
- argsToOverride.add("--namespace=" + namespace);
- }
-
- LOGGER.info("kubectl {}", argsToOverride);
- LOGGER.debug(stdin);
-
- try {
- int exitCode = execute(
- argsToOverride.toArray(new String[0]),
- ins,
- stdout,
- stderr
- );
-
- if (exitCode == 0) {
- return new String(stdout.toByteArray());
- } else {
- String output = new String(stderr.toByteArray());
- throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
- }
- } catch (Exception e) {
- String output = new String(stderr.toByteArray());
- throw new IOException(output, e);
- }
- }
-
- public int execute(String [] args, InputStream stdin, OutputStream stdout, OutputStream stderr) throws IOException {
- DefaultExecutor executor = new DefaultExecutor();
- CommandLine cmd = new CommandLine(kubectlCmd);
- cmd.addArguments(args);
-
- ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L);
- executor.setWatchdog(watchdog);
-
- PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin);
- executor.setStreamHandler(streamHandler);
- return executor.execute(cmd);
- }
-}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 26467a0..ad4d5fd 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -17,32 +17,34 @@
package org.apache.zeppelin.interpreter.launcher;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.junit.Rule;
+import org.junit.Test;
+
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
public class K8sRemoteInterpreterProcessTest {
+ @Rule
+ public KubernetesServer server = new KubernetesServer(true, true);
+
@Test
public void testGetHostPort() {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
HashMap<String, String> envs = new HashMap<String, String>();
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -58,21 +60,19 @@ public class K8sRemoteInterpreterProcessTest {
false);
// then
- assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
+ assertEquals(String.format("%s.%s.svc", intp.getPodName(), "default"), intp.getHost());
assertEquals(12321, intp.getPort());
}
@Test
public void testPredefinedPortNumbers() {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
HashMap<String, String> envs = new HashMap<String, String>();
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -98,16 +98,14 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testGetTemplateBindings() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("my.key1", "v1");
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("MY_ENV1", "V1");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -123,7 +121,7 @@ public class K8sRemoteInterpreterProcessTest {
false);
// when
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings(null);
// then
assertEquals("default", p.get("zeppelin.k8s.namespace"));
@@ -148,9 +146,6 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testGetTemplateBindingsForSpark() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
@@ -160,7 +155,8 @@ public class K8sRemoteInterpreterProcessTest {
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -176,8 +172,7 @@ public class K8sRemoteInterpreterProcessTest {
false);
// when
- intp.start("mytestUser");
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings("mytestUser");
// then
assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
@@ -188,7 +183,7 @@ public class K8sRemoteInterpreterProcessTest {
String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
assertTrue(sparkSubmitOptions.startsWith("my options "));
- assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+ assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
@@ -201,9 +196,6 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
@@ -213,7 +205,8 @@ public class K8sRemoteInterpreterProcessTest {
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -229,8 +222,7 @@ public class K8sRemoteInterpreterProcessTest {
true);
// when
- intp.start("mytestUser");
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings("mytestUser");
// then
assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
@@ -240,7 +232,7 @@ public class K8sRemoteInterpreterProcessTest {
String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
assertTrue(sparkSubmitOptions.startsWith("my options "));
- assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+ assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
@@ -253,9 +245,6 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("my.key1", "v1");
properties.put("spark.master", "k8s://http://api");
@@ -265,7 +254,8 @@ public class K8sRemoteInterpreterProcessTest {
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -281,8 +271,7 @@ public class K8sRemoteInterpreterProcessTest {
true);
// when
- intp.start("anonymous");
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings("anonymous");
// then
assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
@@ -298,15 +287,13 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testSparkUiWebUrlTemplate() {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
HashMap<String, String> envs = new HashMap<String, String>();
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -341,9 +328,6 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testSparkPodResources() {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.cores", "1");
@@ -351,7 +335,8 @@ public class K8sRemoteInterpreterProcessTest {
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -367,7 +352,7 @@ public class K8sRemoteInterpreterProcessTest {
false);
// when
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings(null);
// then
assertEquals("1", p.get("zeppelin.k8s.interpreter.cores"));
@@ -377,9 +362,6 @@ public class K8sRemoteInterpreterProcessTest {
@Test
public void testSparkPodResourcesMemoryOverhead() {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
Properties properties = new Properties();
properties.put("spark.driver.memory", "1g");
properties.put("spark.driver.memoryOverhead", "256m");
@@ -388,7 +370,8 @@ public class K8sRemoteInterpreterProcessTest {
envs.put("SERVICE_DOMAIN", "mydomain");
K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- kubectl,
+ server.getClient(),
+ "default",
new File(".skip"),
"interpreter-container:1.0",
"shared_process",
@@ -404,7 +387,7 @@ public class K8sRemoteInterpreterProcessTest {
false);
// when
- Properties p = intp.getTemplateBindings();
+ Properties p = intp.getTemplateBindings(null);
// then
assertEquals("5", p.get("zeppelin.k8s.interpreter.cores"));
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
index daf3773..f859cab 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
@@ -51,6 +51,21 @@ public class K8sSpecTemplateTest {
}
@Test
+ public void testRenderWithStrip() {
+ // given
+ K8sSpecTemplate template = new K8sSpecTemplate();
+ template.put("test", "test");
+ // when
+ String spec = template.render(
+ " {% if test == \"test\" %}\n" +
+ " After commit\n" +
+ " {% endif %}\n");
+
+ // then
+ assertEquals(" After commit\n", spec);
+ }
+
+ @Test
public void testIterate() {
// given
K8sSpecTemplate template = new K8sSpecTemplate();
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
index c580a43..ff47dd0 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
@@ -17,18 +17,16 @@
package org.apache.zeppelin.interpreter.launcher;
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Properties;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.junit.Before;
+import org.junit.Test;
/**
* In the future, test may use minikube on travis for end-to-end test
@@ -46,11 +44,8 @@ public class K8sStandardInterpreterLauncherTest {
@Test
public void testK8sLauncher() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+ K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
@@ -78,11 +73,8 @@ public class K8sStandardInterpreterLauncherTest {
@Test
public void testK8sLauncherWithSparkAndUserImpersonate() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+ K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
@@ -108,22 +100,14 @@ public class K8sStandardInterpreterLauncherTest {
assertTrue(client instanceof K8sRemoteInterpreterProcess);
K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
assertTrue(process.isSpark());
-
- // when
- process.start(context.getUserName());
-
- // then
- assertTrue(process.buildSparkSubmitOptions().contains("--proxy-user user1"));
+ assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
}
@Test
public void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOException {
// given
- Kubectl kubectl = mock(Kubectl.class);
- when(kubectl.getNamespace()).thenReturn("default");
-
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
- K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+ K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
@@ -149,11 +133,6 @@ public class K8sStandardInterpreterLauncherTest {
assertTrue(client instanceof K8sRemoteInterpreterProcess);
K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
assertTrue(process.isSpark());
-
- // when
- process.start(context.getUserName());
-
- // then
- assertFalse(process.buildSparkSubmitOptions().contains("--proxy-user user1"));
+ assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
}
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java
deleted file mode 100644
index 072cf94..0000000
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.commons.io.IOUtils;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class KubectlTest {
-
- @Test(expected = IOException.class)
- public void testKubeclCommandNotExists() throws IOException {
- // given
- Kubectl kubectl = new Kubectl("invalidcommand");
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
- // when
- kubectl.execute(new String[] {}, null, stdout, stderr);
-
- // then throw IOException
- }
-
- @Test
- public void testStdout() throws IOException {
- // given
- Kubectl kubectl = new Kubectl("echo");
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
- // when
- kubectl.execute(new String[] {"hello"}, null, stdout, stderr);
-
- // then
- assertEquals("hello\n", stdout.toString());
- assertEquals("", stderr.toString());
- }
-
- @Test
- public void testStderr() throws IOException {
- // given
- Kubectl kubectl = new Kubectl("sh");
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
- // when
- try {
- kubectl.execute(new String[]{"-c", "yoyo"}, null, stdout, stderr);
- } catch (IOException e) {
- }
-
- // then
- assertEquals("", stdout.toString());
- assertTrue(0 < stderr.toString().length());
- }
-
- @Test
- public void testStdin() throws IOException {
- // given
- Kubectl kubectl = new Kubectl("wc");
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- InputStream stdin = IOUtils.toInputStream("Hello");
-
- // when
- kubectl.execute(new String[]{"-c"}, stdin, stdout, stderr);
-
- // then
- assertEquals("5", stdout.toString().trim());
- assertEquals("", stderr.toString());
- }
-
- @Test
- public void testExecSpecAndGet() throws IOException {
- // given
- Kubectl kubectl = new Kubectl("cat");
- String spec = "{'k1': 'v1', 'k2': 2}";
-
- // when
- String result = kubectl.execAndGet(new String[]{}, spec);
-
- // then
- assertEquals(spec, result);
- }
-}