You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/06/08 06:16:46 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5400] Polish K8s
launcher
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 371fd42 [ZEPPELIN-5400] Polish K8s launcher
371fd42 is described below
commit 371fd42b3933860ba1feb52e7c5c6cf3600eb2a8
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue Jun 8 07:04:18 2021 +0200
[ZEPPELIN-5400] Polish K8s launcher
### What is this PR for?
This PR polish the K8s launcher and tries to fix the flapping UT test
### What type of PR is it?
- Refactoring
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5400
### How should this be tested?
* CI
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #4129 from Reamer/k8s_improvements and squashes the following commits:
08a149723 [Philipp Dallig] ignore flaky unit tests
c551daf8a [Philipp Dallig] Polish k8s launcher
(cherry picked from commit 61788c44469203db200360e1d99af77da4e9c140)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
.../launcher/K8sRemoteInterpreterProcess.java | 56 +++++++++++-----------
.../launcher/K8sStandardInterpreterLauncher.java | 17 ++++---
.../launcher/K8sRemoteInterpreterProcessTest.java | 6 +--
.../interpreter/launcher/PodPhaseWatcherTest.java | 3 ++
4 files changed, 42 insertions(+), 40 deletions(-)
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 80a6f15..3711bee 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -70,7 +70,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
private final boolean timeoutDuringPending;
- private AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private static final String SPARK_DRIVER_MEMORY = "spark.driver.memory";
private static final String SPARK_DRIVER_MEMORY_OVERHEAD = "spark.driver.memoryOverhead";
@@ -122,16 +122,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
}
/**
- * Get interpreter pod name
- * @return
+ * @return Get interpreter pod name
*/
public String getPodName() {
return podName;
}
/**
- * Get namespace
- * @return
+ * @return Get namespace
*/
public String getNamespace() {
return namespace;
@@ -165,14 +163,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
// wait until interpreter send started message through thrift rpc
synchronized (started) {
while (!started.get() && !Thread.currentThread().isInterrupted()) {
- long timetoTimeout = timeoutTime - System.currentTimeMillis();
- if (timetoTimeout <= 0) {
+ long timeToTimeout = timeoutTime - System.currentTimeMillis();
+ if (timeToTimeout <= 0) {
processStopped("The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
stop();
throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
}
try {
- started.wait(timetoTimeout);
+ started.wait(timeToTimeout);
} catch (InterruptedException e) {
LOGGER.error("Interrupt received during started wait. Try to stop the interpreter and interrupt the current thread.", e);
processStopped("The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
@@ -189,7 +187,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
// WATCH for soft shutdown
PodPhaseWatcher podWatcher = new PodPhaseWatcher(phase -> StringUtils.equalsAny(phase, "Succeeded", "Failed"));
try (Watch watch = client.pods().inNamespace(namespace).withName(podName).watch(podWatcher)) {
- if (!podWatcher.getCountDownLatch().await(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500,
+ if (!podWatcher.getCountDownLatch().await(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500L,
TimeUnit.MILLISECONDS)) {
LOGGER.warn("Pod {} doesn't terminate in time", podName);
}
@@ -210,7 +208,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
try {
localPortForward.close();
} catch (IOException e) {
- LOGGER.info("Error on closing portforwarder", e);
+ LOGGER.info("Error on closing Port Forwarding", e);
}
}
}
@@ -236,7 +234,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
}
/**
* Apply spec file(s) in the path.
- * @param path
+ * @param path Path to the K8s resources
+ * @param delete set to true, the K8s resources are deleted
+ * @param templateProperties properties to enrich the template
*/
void apply(File path, boolean delete, Properties templateProperties) throws IOException {
if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
@@ -362,8 +362,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
return "spark".equalsIgnoreCase(interpreterGroupName);
}
- boolean isSparkOnKubernetes(Properties interpreteProperties) {
- String propertySparkMaster = (String) interpreteProperties.getOrDefault("spark.master", "");
+ boolean isSparkOnKubernetes(Properties interpreterProperties) {
+ String propertySparkMaster = (String) interpreterProperties.getOrDefault("spark.master", "");
return propertySparkMaster.startsWith("k8s://");
}
@@ -374,19 +374,19 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
options.append(" --master k8s://https://kubernetes.default.svc");
options.append(" --deploy-mode client");
if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
- options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMORY));
+ options.append(" --driver-memory ").append(properties.get(SPARK_DRIVER_MEMORY));
}
if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) {
- options.append(" --proxy-user " + userName);
+ options.append(" --proxy-user ").append(userName);
}
- options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
+ options.append(" --conf spark.kubernetes.namespace=").append(getNamespace());
options.append(" --conf spark.executor.instances=1");
- options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
- options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
+ options.append(" --conf spark.kubernetes.driver.pod.name=").append(getPodName());
+ options.append(" --conf spark.kubernetes.container.image=").append(sparkImage);
options.append(" --conf spark.driver.bindAddress=0.0.0.0");
- options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName());
- options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort()));
- options.append(" --conf spark.blockManager.port=" + String.format("%d", getSparkBlockmanagerPort()));
+ options.append(" --conf spark.driver.host=").append(getInterpreterPodDnsName());
+ options.append(" --conf spark.driver.port=").append(getSparkDriverPort());
+ options.append(" --conf spark.blockManager.port=").append(getSparkBlockManagerPort());
return options.toString();
}
@@ -399,7 +399,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
/**
* See xxx-interpreter-pod.yaml
- * @return
+ * @return SparkDriverPort
*/
@VisibleForTesting
int getSparkDriverPort() {
@@ -408,10 +408,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
/**
* See xxx-interpreter-pod.yaml
- * @return
+ * @return Spark block manager port
*/
@VisibleForTesting
- int getSparkBlockmanagerPort() {
+ int getSparkBlockManagerPort() {
return 22322;
}
@@ -433,10 +433,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
if (portForward) {
LOGGER.info("Starting Port Forwarding");
try {
- int localforwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+ int localForwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
localPortForward = client.pods().inNamespace(namespace).withName(podName)
- .portForward(K8S_INTERPRETER_SERVICE_PORT, localforwardedPodPort);
- super.processStarted(localforwardedPodPort, "localhost");
+ .portForward(K8S_INTERPRETER_SERVICE_PORT, localForwardedPodPort);
+ super.processStarted(localForwardedPodPort, "localhost");
} catch (IOException e) {
LOGGER.error("Unable to create a PortForward", e);
}
@@ -452,6 +452,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
@Override
public String getErrorMessage() {
- return String.format("%s%ncurrent PodPhase: %s", super.getErrorMessage(), getPodPhase());
+ return String.format("%s%n current PodPhase: %s", super.getErrorMessage(), getPodPhase());
}
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index d76727b..bd620fe 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -45,7 +45,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
private final KubernetesClient client;
- public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
+ public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
super(zConf, recoveryStorage);
client = new DefaultKubernetesClient();
}
@@ -59,7 +59,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
* to use K8sStandardInterpreterLauncher. This is useful for development. It allows Zeppelin server running on your
* IDE and creates your interpreters in Kubernetes. So any code changes on Zeppelin server or kubernetes yaml spec
* can be applied without re-building docker image.
- * @return
+ * @return true, if running on K8s
*/
boolean isRunningOnKubernetes() {
return new File("/var/run/secrets/kubernetes.io").exists();
@@ -67,7 +67,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
/**
* Get current namespace
- * @throws IOException
+ * @throws IOException if namespace file could not be read
*/
String getNamespace() throws IOException {
if (isRunningOnKubernetes()) {
@@ -78,8 +78,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
}
/**
- * Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes
- * @return
+ * @return Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes or
+ * localhost in case of an UnknownHostException
*/
String getHostname() {
try {
@@ -90,9 +90,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
}
/**
- * get Zeppelin service.
- * return <service-name>.<namespace>.svc
- * @throws IOException
+ * @return get Zeppelin service. <service-name>.<namespace>.svc
+ * @throws IOException if the Zeppelin service cannot be generated
*/
private String getZeppelinService(InterpreterLaunchContext context) throws IOException {
if (isRunningOnKubernetes()) {
@@ -122,7 +121,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
* Interpreter Process will run in K8s. There is no point in changing the user after starting the container.
* Switching to an other user (non-privileged) should be done during the image creation process.
*
- * Only if a spark interpreter process is running, userImpersonatation should be possible for --proxy-user
+ * Only if a spark interpreter process is running, user impersonation should be possible for --proxy-user
*/
private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) {
return zConf.getZeppelinImpersonateSparkProxyUser() &&
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index dbc4555..eafad5a 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -78,7 +78,7 @@ public class K8sRemoteInterpreterProcessTest {
// when change those values, update the yaml file as well.
assertEquals("12321:12321", intp.getInterpreterPortRange());
assertEquals(22321, intp.getSparkDriverPort());
- assertEquals(22322, intp.getSparkBlockmanagerPort());
+ assertEquals(22322, intp.getSparkBlockManagerPort());
}
@Test
@@ -178,7 +178,7 @@ public class K8sRemoteInterpreterProcessTest {
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
- assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+ assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertFalse(sparkSubmitOptions.contains("--proxy-user"));
assertTrue(intp.isSpark());
}
@@ -229,7 +229,7 @@ public class K8sRemoteInterpreterProcessTest {
assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
- assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+ assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
assertTrue(intp.isSpark());
}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
index aa08352..735800b 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -41,6 +42,7 @@ public class PodPhaseWatcherTest {
public KubernetesServer server = new KubernetesServer(false, true);
@Test
+ @Ignore("Reamer - ZEPPELIN-5403")
public void testPhase() throws InterruptedException {
KubernetesClient client = server.getClient();
// CREATE
@@ -71,6 +73,7 @@ public class PodPhaseWatcherTest {
}
@Test
+ @Ignore("Reamer - ZEPPELIN-5403")
public void testPhaseWithError() throws InterruptedException {
KubernetesClient client = server.getClient();
// CREATE