You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/02/26 12:55:24 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5259] Use
RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new dff554c [ZEPPELIN-5259] Use RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess
dff554c is described below
commit dff554ccf9dc023a69c4ab082ea73708981a9ef8
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue Feb 16 18:34:40 2021 +0100
[ZEPPELIN-5259] Use RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess
### What is this PR for?
This PR changes the parent class for K8sRemoteInterpreterProcess to RemoteInterpreterManagedProcess.
It also contains some bug fixes.
I removed the static method `RemoteInterpreterUtils.checkIfRemoteEndpointAccessible`, because this function is unnecessary and it also blocks testing. It is sufficient to listen for `processStarted`. This signals a successful start of the interpreter.
### What type of PR is it?
Improvement and Bugfixing
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5259
### How should this be tested?
* CI
* Port forwarding was tested locally
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #4060 from Reamer/K8s_remote_interpreter_manager and squashes the following commits:
26bed4515 [Philipp Dallig] Remove RemoteInterpreterUtils.checkIfRemoteEndpointAccessible and add Kube-Mock tests
ec2843497 [Philipp Dallig] Rewrite port-forward
b6233e264 [Philipp Dallig] Remove duplicate code
8e7ed67ba [Philipp Dallig] Remove context as a local varaible
0051b878a [Philipp Dallig] Use RemoteInterpreterManagedProcess instead of RemoteInterpreterProcess
f010075cb [Philipp Dallig] Move pod random suffix to K8sUtils
(cherry picked from commit 7bf1f248733a74a2ca674f17ce5165de214bba68)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
.../launcher/K8sRemoteInterpreterProcess.java | 178 ++++++---------
.../launcher/K8sStandardInterpreterLauncher.java | 10 +-
.../zeppelin/interpreter/launcher/K8sUtils.java | 14 ++
.../launcher/K8sRemoteInterpreterProcessTest.java | 249 ++++++++++++++++++---
.../test/resources/k8s-specs/interpreter-spec.yaml | 162 ++++++++++++++
5 files changed, 459 insertions(+), 154 deletions(-)
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 668343c..e4b39ae 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -25,13 +25,12 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,31 +47,26 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.LocalPortForward;
import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable;
-public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
+public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
private final KubernetesClient client;
private final String namespace;
- private final String interpreterGroupId;
private final String interpreterGroupName;
- private final String interpreterSettingName;
private final File specTemplates;
private final String containerImage;
private final Properties properties;
- private final Map<String, String> envs;
private final String podName;
- private final boolean portForward;
private final String sparkImage;
+
+ // Pod Forward
+ private final boolean portForward;
private LocalPortForward localPortForward;
- private int podPort = K8S_INTERPRETER_SERVICE_PORT;
- private String errorMessage;
- private final boolean isUserImpersonatedForSpark;
private final boolean timeoutDuringPending;
private AtomicBoolean started = new AtomicBoolean(false);
- private Random rand = new Random();
private static final String SPARK_DRIVER_MEMORY = "spark.driver.memory";
private static final String SPARK_DRIVER_MEMORY_OVERHEAD = "spark.driver.memoryOverhead";
@@ -99,24 +93,29 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
boolean isUserImpersonatedForSpark,
boolean timeoutDuringPending
) {
- super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
+ super(intpEventServerPort,
+ intpEventServerHost,
+ String.format("%d:%d", K8S_INTERPRETER_SERVICE_PORT, K8S_INTERPRETER_SERVICE_PORT),
+ "${ZEPPELIN_HOME}/interpreter/" + interpreterGroupName,
+ "/tmp/local-repo",
+ envs,
+ connectTimeout,
+ connectionPoolSize,
+ interpreterSettingName,
+ interpreterGroupId,
+ isUserImpersonatedForSpark);
this.client = client;
this.namespace = namespace;
this.specTemplates = specTemplates;
this.containerImage = containerImage;
- this.interpreterGroupId = interpreterGroupId;
this.interpreterGroupName = interpreterGroupName;
- this.interpreterSettingName = interpreterSettingName;
this.properties = properties;
- this.envs = new HashMap<>(envs);
this.portForward = portForward;
this.sparkImage = sparkImage;
- this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
- this.isUserImpersonatedForSpark = isUserImpersonatedForSpark;
+ this.podName = interpreterGroupName.toLowerCase() + "-" + K8sUtils.getRandomPodSuffix(6);
this.timeoutDuringPending = timeoutDuringPending;
}
-
/**
* Get interpreter pod name
* @return
@@ -134,35 +133,21 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
@Override
- public String getInterpreterGroupId() {
- return interpreterGroupId;
- }
-
- @Override
- public String getInterpreterSettingName() {
- return interpreterSettingName;
- }
-
- @Override
public void start(String userName) throws IOException {
Properties templateProperties = getTemplateBindings(userName);
// create new pod
apply(specTemplates, false, templateProperties);
- if (portForward) {
- podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort);
- }
-
// special handling if we doesn't want timeout the process during lifecycle phase pending
- if (!timeoutDuringPending) {
- while ("pending".equalsIgnoreCase(getPodPhase()) && !Thread.currentThread().isInterrupted()) {
+ if (timeoutDuringPending) {
+ while (!StringUtils.equalsAnyIgnoreCase(getPodPhase(), "Succeeded", "Failed", "Running")
+ && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("Interrupt received during pending phase. Try to stop the interpreter and interrupt the current thread.", e);
- errorMessage = "Start process was interrupted during the pending phase";
+ processStopped("Start process was interrupted during the pending phase");
stop();
Thread.currentThread().interrupt();
}
@@ -177,7 +162,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
while (!started.get() && !Thread.currentThread().isInterrupted()) {
long timetoTimeout = timeoutTime - System.currentTimeMillis();
if (timetoTimeout <= 0) {
- errorMessage = "The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase();
+ processStopped("The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
stop();
throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
}
@@ -185,33 +170,17 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
started.wait(timetoTimeout);
} catch (InterruptedException e) {
LOGGER.error("Interrupt received during started wait. Try to stop the interpreter and interrupt the current thread.", e);
- errorMessage = "The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase();
+ processStopped("The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
stop();
Thread.currentThread().interrupt();
}
}
}
-
- // waits for interpreter thrift rpc server ready
- while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()) && !Thread.currentThread().isInterrupted()) {
- if (System.currentTimeMillis() - timeoutTime > 0) {
- errorMessage = "The start process was aborted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase();
- stop();
- throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOGGER.error("Interrupt received during remote endpoint accessible check. Try to stop the interpreter and interrupt the current thread.", e);
- errorMessage = "The start process was interrupted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase();
- stop();
- Thread.currentThread().interrupt();
- }
- }
}
@Override
public void stop() {
+ super.stop();
Properties templateProperties = getTemplateBindings(null);
// delete pod
try {
@@ -219,54 +188,32 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
} catch (IOException e) {
LOGGER.info("Error on removing interpreter pod", e);
}
- if (portForward) {
- try {
- localPortForward.close();
- } catch (IOException e) {
- LOGGER.info("Error on closing portforwarder", e);
- }
- }
- // Shutdown connection
- shutdown();
- }
-
- @Override
- public String getHost() {
- if (portForward) {
- return "localhost";
- } else {
- return getInterpreterPodDnsName();
+ if (portForward && localPortForward != null) {
+ LOGGER.info("Stopping Port Forwarding");
+ try {
+ localPortForward.close();
+ } catch (IOException e) {
+ LOGGER.info("Error on closing portforwarder", e);
+ }
}
}
@Override
- public int getPort() {
- return podPort;
+ public boolean isRunning() {
+ return "Running".equalsIgnoreCase(getPodPhase()) && started.get();
}
- @Override
- public boolean isRunning() {
+ public String getPodPhase() {
try {
- if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
- return true;
- }
Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
if (pod != null) {
PodStatus status = pod.getStatus();
if (status != null) {
- return "Running".equals(status.getPhase()) && started.get();
+ return status.getPhase();
}
}
} catch (Exception e) {
- LOGGER.error("Can't get pod status", e);
- }
- return false;
- }
-
- public String getPodPhase() {
- Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
- if (pod != null) {
- return pod.getStatus().getPhase();
+ LOGGER.error("Can't get pod phase", e);
}
return "Unknown";
}
@@ -315,29 +262,30 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
- k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId);
+ k8sProperties.put("zeppelin.k8s.interpreter.group.id", getInterpreterGroupId());
k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
- k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
- k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
- k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort()));
+ k8sProperties.put("zeppelin.k8s.interpreter.setting.name", getInterpreterSettingName());
+ k8sProperties.put("zeppelin.k8s.interpreter.localRepo", getLocalRepoDir());
+ k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", getInterpreterPortRange());
k8sProperties.put("zeppelin.k8s.server.rpc.service", intpEventServerHost);
k8sProperties.put("zeppelin.k8s.server.rpc.portRange", intpEventServerPort);
if (ownerUID() != null && ownerName() != null) {
k8sProperties.put("zeppelin.k8s.server.uid", ownerUID());
k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName());
}
-
+ Map<String, String> k8sEnv = new HashMap<>(getEnv());
// environment variables
- envs.put(ENV_SERVICE_DOMAIN, envs.getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
- envs.put(ENV_ZEPPELIN_HOME, envs.getOrDefault(ENV_ZEPPELIN_HOME, System.getenv(ENV_ZEPPELIN_HOME)));
+ k8sEnv.put(ENV_SERVICE_DOMAIN, getEnv().getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
+ k8sEnv.put(ENV_ZEPPELIN_HOME, getEnv().getOrDefault(ENV_ZEPPELIN_HOME, System.getenv(ENV_ZEPPELIN_HOME)));
if (isSpark()) {
int webUiPort = 4040;
k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);
if (isSparkOnKubernetes(properties)) {
- envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
+ k8sEnv.put("SPARK_SUBMIT_OPTIONS",
+ getEnv().getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
}
- envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
+ k8sEnv.put("SPARK_HOME", getEnv().getOrDefault("SPARK_HOME", "/spark"));
// configure interpreter property "zeppelin.spark.uiWebUrl" if not defined, to enable spark ui through reverse proxy
String webUrl = (String) properties.get("zeppelin.spark.uiWebUrl");
@@ -349,7 +297,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
webUrl,
webUiPort,
getPodName(),
- envs.get(ENV_SERVICE_DOMAIN)
+ k8sEnv.get(ENV_SERVICE_DOMAIN)
));
// Resources of Interpreter Pod
if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
@@ -367,7 +315,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
}
}
- k8sProperties.put("zeppelin.k8s.envs", envs);
+ k8sProperties.put("zeppelin.k8s.envs", k8sEnv);
// interpreter properties overrides the values
k8sProperties.putAll(Maps.fromProperties(properties));
@@ -411,7 +359,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMORY));
}
- if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
+ if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) {
options.append(" --proxy-user " + userName);
}
options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
@@ -463,20 +411,22 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
return System.getenv("POD_NAME");
}
- private String getRandomString(int length) {
- char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++) {
- char c = chars[rand.nextInt(chars.length)];
- sb.append(c);
- }
- return sb.toString();
- }
-
@Override
public void processStarted(int port, String host) {
- LOGGER.info("Interpreter pod created {}:{}", host, port);
+ if (portForward) {
+ LOGGER.info("Starting Port Forwarding");
+ try {
+ int localforwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ localPortForward = client.pods().inNamespace(namespace).withName(podName)
+ .portForward(K8S_INTERPRETER_SERVICE_PORT, localforwardedPodPort);
+ super.processStarted(localforwardedPodPort, "localhost");
+ } catch (IOException e) {
+ LOGGER.error("Unable to create a PortForward", e);
+ }
+ } else {
+ super.processStarted(port, getInterpreterPodDnsName());
+ }
+ LOGGER.info("Interpreter pod created {}:{}", getHost(), getPort());
synchronized (started) {
started.set(true);
started.notifyAll();
@@ -485,6 +435,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
@Override
public String getErrorMessage() {
- return String.format("%s%ncurrent PodPhase: %s", errorMessage, getPodPhase());
+ return String.format("%s%ncurrent PodPhase: %s", super.getErrorMessage(), getPodPhase());
}
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 289bae3..d76727b 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -43,7 +43,6 @@ import io.fabric8.kubernetes.client.KubernetesClient;
public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
- private InterpreterLaunchContext context;
private final KubernetesClient client;
public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
@@ -95,7 +94,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
* return <service-name>.<namespace>.svc
* @throws IOException
*/
- private String getZeppelinService() throws IOException {
+ private String getZeppelinService(InterpreterLaunchContext context) throws IOException {
if (isRunningOnKubernetes()) {
return String.format("%s.%s.svc",
zConf.getK8sServiceName(),
@@ -109,7 +108,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
* get Zeppelin server rpc port
* Read env variable "<HOSTNAME>_SERVICE_PORT_RPC"
*/
- private int getZeppelinServiceRpcPort() {
+ private int getZeppelinServiceRpcPort(InterpreterLaunchContext context) {
String envServicePort = System.getenv(
String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase()));
if (envServicePort != null) {
@@ -134,7 +133,6 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
@Override
public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
- this.context = context;
this.properties = context.getProperties();
return new K8sRemoteInterpreterProcess(
@@ -147,8 +145,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
context.getInterpreterSettingName(),
properties,
buildEnvFromProperties(context),
- getZeppelinService(),
- getZeppelinServiceRpcPort(),
+ getZeppelinService(context),
+ getZeppelinServiceRpcPort(context),
zConf.getK8sPortForward(),
zConf.getK8sSparkContainerImage(),
getConnectTimeout(),
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
index dd0e05f..e09c517 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.launcher;
+import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -31,6 +32,8 @@ public class K8sUtils {
private static final long T = G * K;
private static final long MINIMUM_OVERHEAD = 384;
+ private static final Random rand = new Random();
+
private K8sUtils() {
// do nothing
}
@@ -74,4 +77,15 @@ public class K8sUtils {
}
return memoryAmountBytes;
}
+
+ public static String getRandomPodSuffix(int length) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ char c = chars[rand.nextInt(chars.length)];
+ sb.append(c);
+ }
+ return sb.toString();
+ }
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 95cd604..a8e9b30 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -19,16 +19,27 @@ package org.apache.zeppelin.interpreter.launcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Rule;
import org.junit.Test;
-
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
public class K8sRemoteInterpreterProcessTest {
@@ -37,36 +48,6 @@ public class K8sRemoteInterpreterProcessTest {
public KubernetesServer server = new KubernetesServer(true, true);
@Test
- public void testGetHostPort() {
- // given
- Properties properties = new Properties();
- HashMap<String, String> envs = new HashMap<String, String>();
-
- K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
- server.getClient(),
- "default",
- new File(".skip"),
- "interpreter-container:1.0",
- "shared_process",
- "sh",
- "shell",
- properties,
- envs,
- "zeppelin.server.hostname",
- 12320,
- false,
- "spark-container:1.0",
- 10,
- 10,
- false,
- false);
-
- // then
- assertEquals(String.format("%s.%s.svc", intp.getPodName(), "default"), intp.getHost());
- assertEquals(12321, intp.getPort());
- }
-
- @Test
public void testPredefinedPortNumbers() {
// given
Properties properties = new Properties();
@@ -94,7 +75,7 @@ public class K8sRemoteInterpreterProcessTest {
// following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
// when change those values, update the yaml file as well.
- assertEquals(12321, intp.getPort());
+ assertEquals("12321:12321", intp.getInterpreterPortRange());
assertEquals(22321, intp.getSparkDriverPort());
assertEquals(22322, intp.getSparkBlockmanagerPort());
}
@@ -194,7 +175,7 @@ public class K8sRemoteInterpreterProcessTest {
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
- assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+ assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
assertFalse(sparkSubmitOptions.contains("--proxy-user"));
@@ -245,7 +226,7 @@ public class K8sRemoteInterpreterProcessTest {
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
- assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+ assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
@@ -412,4 +393,204 @@ public class K8sRemoteInterpreterProcessTest {
assertEquals("1280Mi", p.get("zeppelin.k8s.interpreter.memory"));
}
+ @Test
+ public void testK8sStartSuccessful() throws IOException, InterruptedException {
+ // given
+ Properties properties = new Properties();
+ HashMap<String, String> envs = new HashMap<String, String>();
+ envs.put("SERVICE_DOMAIN", "mydomain");
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource("k8s-specs/interpreter-spec.yaml");
+ File file = new File(url.getPath());
+
+ K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+ server.getClient(),
+ "default",
+ file,
+ "interpreter-container:1.0",
+ "shared_process",
+ "spark",
+ "myspark",
+ properties,
+ envs,
+ "zeppelin.server.service",
+ 12320,
+ false,
+ "spark-container:1.0",
+ 10000,
+ 10,
+ false,
+ true);
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service
+ .submit(new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp));
+ intp.start("TestUser");
+ // then
+ assertEquals("Running", intp.getPodPhase());
+ }
+
+ @Test
+ public void testK8sStartFailed() throws IOException, InterruptedException {
+ // given
+ Properties properties = new Properties();
+ HashMap<String, String> envs = new HashMap<String, String>();
+ envs.put("SERVICE_DOMAIN", "mydomain");
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource("k8s-specs/interpreter-spec.yaml");
+ File file = new File(url.getPath());
+
+ K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+ server.getClient(),
+ "default",
+ file,
+ "interpreter-container:1.0",
+ "shared_process",
+ "spark",
+ "myspark",
+ properties,
+ envs,
+ "zeppelin.server.service",
+ 12320,
+ false,
+ "spark-container:1.0",
+ 3000,
+ 10,
+ false,
+ true);
+ PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
+ podStatusSimulator.setSecondPhase("Failed");
+ podStatusSimulator.setSuccessfullStart(false);
+ ExecutorService service = Executors.newFixedThreadPool(1);
+ service
+ .submit(podStatusSimulator);
+ // should throw an IOException
+ try {
+ intp.start("TestUser");
+ fail("We excepting an IOException");
+ } catch (IOException e) {
+ assertNotNull(e);
+ // Check that the Pod is deleted
+ assertNull(
+ server.getClient().pods().inNamespace(intp.getNamespace()).withName(intp.getPodName())
+ .get());
+ }
+ }
+
+ @Test
+ public void testK8sStartTimeoutPending() throws IOException, InterruptedException {
+ // given
+ Properties properties = new Properties();
+ HashMap<String, String> envs = new HashMap<String, String>();
+ envs.put("SERVICE_DOMAIN", "mydomain");
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource("k8s-specs/interpreter-spec.yaml");
+ File file = new File(url.getPath());
+
+ K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+ server.getClient(),
+ "default",
+ file,
+ "interpreter-container:1.0",
+ "shared_process",
+ "spark",
+ "myspark",
+ properties,
+ envs,
+ "zeppelin.server.service",
+ 12320,
+ false,
+ "spark-container:1.0",
+ 3000,
+ 10,
+ false,
+ true);
+ PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
+ podStatusSimulator.setFirstPhase("Pending");
+ podStatusSimulator.setSecondPhase("Pending");
+ podStatusSimulator.setSuccessfullStart(false);
+ ExecutorService service = Executors.newFixedThreadPool(2);
+ service
+ .submit(podStatusSimulator);
+ service.submit(() -> {
+ try {
+ intp.start("TestUser");
+ fail("We interrupt, this line of code should not be executed.");
+ } catch (IOException e) {
+ fail("We interrupt, this line of code should not be executed.");
+ }
+ });
+ // wait a little bit
+ TimeUnit.SECONDS.sleep(5);
+ service.shutdownNow();
+ // wait for a shutdown
+ service.awaitTermination(10, TimeUnit.SECONDS);
+ // Check that the Pod is deleted
+ assertNull(server.getClient().pods().inNamespace(intp.getNamespace())
+ .withName(intp.getPodName()).get());
+
+ }
+
+ class PodStatusSimulator implements Runnable {
+
+ private final KubernetesClient client;
+ private final String namespace;
+ private final String podName;
+ private final RemoteInterpreterManagedProcess process;
+
+ private String firstPhase = "Pending";
+ private String secondPhase = "Running";
+ private boolean successfulStart = true;
+
+ public PodStatusSimulator(
+ KubernetesClient client,
+ String namespace,
+ String podName,
+ RemoteInterpreterManagedProcess process) {
+ this.client = client;
+ this.namespace = namespace;
+ this.podName = podName;
+ this.process = process;
+ }
+
+ public void setFirstPhase(String phase) {
+ this.firstPhase = phase;
+ }
+ public void setSecondPhase(String phase) {
+ this.secondPhase = phase;
+ }
+ public void setSuccessfullStart(boolean successful) {
+ this.successfulStart = successful;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Instant timeoutTime = Instant.now().plusSeconds(10);
+ while (timeoutTime.isAfter(Instant.now())) {
+ Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+ if (pod != null) {
+ TimeUnit.SECONDS.sleep(1);
+ // Update Pod to "pending" phase
+ pod.setStatus(new PodStatus(null, null, null, null, null, null, null, firstPhase,
+ null,
+ null, null, null, null));
+ client.pods().inNamespace(namespace).updateStatus(pod);
+ // Update Pod to "Running" phase
+ pod.setStatus(new PodStatus(null, null, null, null, null, null, null, secondPhase,
+ null,
+ null, null, null, null));
+ client.pods().inNamespace(namespace).updateStatus(pod);
+ TimeUnit.SECONDS.sleep(1);
+ if (successfulStart) {
+ process.processStarted(12320, "testing");
+ }
+ break;
+ } else {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
new file mode 100644
index 0000000..116b0df
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+kind: Pod
+apiVersion: v1
+metadata:
+ namespace: {{zeppelin.k8s.namespace}}
+ name: {{zeppelin.k8s.interpreter.pod.name}}
+ labels:
+ app: {{zeppelin.k8s.interpreter.pod.name}}
+ interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}}
+ interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}}
+ {% if zeppelin.k8s.server.uid is defined %}
+ ownerReferences:
+ - apiVersion: v1
+ controller: false
+ blockOwnerDeletion: false
+ kind: Pod
+ name: {{zeppelin.k8s.server.pod.name}}
+ uid: {{zeppelin.k8s.server.uid}}
+ {% endif %}
+spec:
+ {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+ automountServiceAccountToken: true
+ {% else %}
+ automountServiceAccountToken: false
+ {% endif %}
+ restartPolicy: Never
+ terminationGracePeriodSeconds: 30
+ containers:
+ - name: {{zeppelin.k8s.interpreter.container.name}}
+ image: {{zeppelin.k8s.interpreter.container.image}}
+ args:
+ - "$(ZEPPELIN_HOME)/bin/interpreter.sh"
+ - "-d"
+ - "$(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}}"
+ - "-r"
+ - "{{zeppelin.k8s.interpreter.rpc.portRange}}"
+ - "-c"
+ - "{{zeppelin.k8s.server.rpc.service}}"
+ - "-p"
+ - "{{zeppelin.k8s.server.rpc.portRange}}"
+ - "-i"
+ - "{{zeppelin.k8s.interpreter.group.id}}"
+ - "-l"
+ - "{{zeppelin.k8s.interpreter.localRepo}}/{{zeppelin.k8s.interpreter.setting.name}}"
+ - "-g"
+ - "{{zeppelin.k8s.interpreter.setting.name}}"
+ env:
+ {% for key, value in zeppelin.k8s.envs.items() %}
+ - name: {{key}}
+ value: {{value}}
+ {% endfor %}
+ {% if zeppelin.k8s.interpreter.cores is defined and zeppelin.k8s.interpreter.memory is defined %}
+ resources:
+ requests:
+ memory: "{{zeppelin.k8s.interpreter.memory}}"
+ cpu: "{{zeppelin.k8s.interpreter.cores}}"
+{# limits.memory is not set because of a potential OOM-Killer. https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits #}
+ limits:
+ cpu: "{{zeppelin.k8s.interpreter.cores}}"
+ {% endif %}
+ {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+ volumeMounts:
+ - name: spark-home
+ mountPath: /spark
+ initContainers:
+ - name: spark-home-init
+ image: {{zeppelin.k8s.spark.container.image}}
+ command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
+ volumeMounts:
+ - name: spark-home
+ mountPath: /spark
+ volumes:
+ - name: spark-home
+ emptyDir: {}
+ {% endif %}
+---
+kind: Service
+apiVersion: v1
+metadata:
+ namespace: {{zeppelin.k8s.namespace}}
+ name: {{zeppelin.k8s.interpreter.pod.name}} # keep Service name the same to Pod name.
+ {% if zeppelin.k8s.server.uid is defined %}
+ ownerReferences:
+ - apiVersion: v1
+ controller: false
+ blockOwnerDeletion: false
+ kind: Pod
+ name: {{zeppelin.k8s.server.pod.name}}
+ uid: {{zeppelin.k8s.server.uid}}
+ {% endif %}
+spec:
+ clusterIP: None
+ ports:
+ - name: intp
+ port: 12321
+ {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+ - name: spark-driver
+ port: 22321
+ - name: spark-blockmanager
+ port: 22322
+ - name: spark-ui
+ port: 4040
+ {% endif %}
+ selector:
+ app: {{zeppelin.k8s.interpreter.pod.name}}
+{% if zeppelin.k8s.interpreter.group.name == "spark" %}
+---
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: {{zeppelin.k8s.interpreter.pod.name}}
+ namespace: {{zeppelin.k8s.namespace}}
+ {% if zeppelin.k8s.server.uid is defined %}
+ ownerReferences:
+ - apiVersion: v1
+ controller: false
+ blockOwnerDeletion: false
+ kind: Pod
+ name: {{zeppelin.k8s.server.pod.name}}
+ uid: {{zeppelin.k8s.server.uid}}
+ {% endif %}
+rules:
+- apiGroups: [""]
+ resources: ["pods", "services"]
+ verbs: ["create", "get", "update", "list", "delete", "watch" ]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: {{zeppelin.k8s.interpreter.pod.name}}
+ {% if zeppelin.k8s.server.uid is defined %}
+ ownerReferences:
+ - apiVersion: v1
+ controller: false
+ blockOwnerDeletion: false
+ kind: Pod
+ name: {{zeppelin.k8s.server.pod.name}}
+ uid: {{zeppelin.k8s.server.uid}}
+ {% endif %}
+subjects:
+- kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: {{zeppelin.k8s.interpreter.pod.name}}
+ apiGroup: rbac.authorization.k8s.io
+{% endif %}
\ No newline at end of file