You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/06/08 06:16:46 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5400] Polish K8s launcher

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 371fd42  [ZEPPELIN-5400] Polish K8s launcher
371fd42 is described below

commit 371fd42b3933860ba1feb52e7c5c6cf3600eb2a8
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue Jun 8 07:04:18 2021 +0200

    [ZEPPELIN-5400] Polish K8s launcher
    
    ### What is this PR for?
    This PR polish the K8s launcher and tries to fix the flapping UT test
    
    ### What type of PR is it?
     - Refactoring
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5400
    
    ### How should this be tested?
    * CI
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #4129 from Reamer/k8s_improvements and squashes the following commits:
    
    08a149723 [Philipp Dallig] ignore flaky unit tests
    c551daf8a [Philipp Dallig] Polish k8s launcher
    
    (cherry picked from commit 61788c44469203db200360e1d99af77da4e9c140)
    Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
 .../launcher/K8sRemoteInterpreterProcess.java      | 56 +++++++++++-----------
 .../launcher/K8sStandardInterpreterLauncher.java   | 17 ++++---
 .../launcher/K8sRemoteInterpreterProcessTest.java  |  6 +--
 .../interpreter/launcher/PodPhaseWatcherTest.java  |  3 ++
 4 files changed, 42 insertions(+), 40 deletions(-)

diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 80a6f15..3711bee 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -70,7 +70,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
 
   private final boolean timeoutDuringPending;
 
-  private AtomicBoolean started = new AtomicBoolean(false);
+  private final AtomicBoolean started = new AtomicBoolean(false);
 
   private static final String SPARK_DRIVER_MEMORY = "spark.driver.memory";
   private static final String SPARK_DRIVER_MEMORY_OVERHEAD = "spark.driver.memoryOverhead";
@@ -122,16 +122,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
   }
 
   /**
-   * Get interpreter pod name
-   * @return
+   * @return Get interpreter pod name
    */
   public String getPodName() {
     return podName;
   }
 
   /**
-   * Get namespace
-   * @return
+   * @return Get namespace
    */
   public String getNamespace() {
     return namespace;
@@ -165,14 +163,14 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
     // wait until interpreter send started message through thrift rpc
     synchronized (started) {
       while (!started.get() && !Thread.currentThread().isInterrupted()) {
-        long timetoTimeout = timeoutTime - System.currentTimeMillis();
-        if (timetoTimeout <= 0) {
+        long timeToTimeout = timeoutTime - System.currentTimeMillis();
+        if (timeToTimeout <= 0) {
           processStopped("The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
           stop();
           throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
         }
         try {
-          started.wait(timetoTimeout);
+          started.wait(timeToTimeout);
         } catch (InterruptedException e) {
           LOGGER.error("Interrupt received during started wait. Try to stop the interpreter and interrupt the current thread.", e);
           processStopped("The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
@@ -189,7 +187,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
     // WATCH for soft shutdown
     PodPhaseWatcher podWatcher = new PodPhaseWatcher(phase -> StringUtils.equalsAny(phase, "Succeeded", "Failed"));
     try (Watch watch = client.pods().inNamespace(namespace).withName(podName).watch(podWatcher)) {
-      if (!podWatcher.getCountDownLatch().await(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500,
+      if (!podWatcher.getCountDownLatch().await(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500L,
           TimeUnit.MILLISECONDS)) {
         LOGGER.warn("Pod {} doesn't terminate in time", podName);
       }
@@ -210,7 +208,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
       try {
         localPortForward.close();
       } catch (IOException e) {
-        LOGGER.info("Error on closing portforwarder", e);
+        LOGGER.info("Error on closing Port Forwarding", e);
       }
     }
   }
@@ -236,7 +234,9 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
   }
   /**
    * Apply spec file(s) in the path.
-   * @param path
+   * @param path Path to the K8s resources
+   * @param delete set to true, the K8s resources are deleted
+   * @param templateProperties properties to enrich the template
    */
   void apply(File path, boolean delete, Properties templateProperties) throws IOException {
     if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
@@ -362,8 +362,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
     return "spark".equalsIgnoreCase(interpreterGroupName);
   }
 
-  boolean isSparkOnKubernetes(Properties interpreteProperties) {
-    String propertySparkMaster = (String) interpreteProperties.getOrDefault("spark.master", "");
+  boolean isSparkOnKubernetes(Properties interpreterProperties) {
+    String propertySparkMaster = (String) interpreterProperties.getOrDefault("spark.master", "");
     return propertySparkMaster.startsWith("k8s://");
   }
 
@@ -374,19 +374,19 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
     options.append(" --master k8s://https://kubernetes.default.svc");
     options.append(" --deploy-mode client");
     if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
-      options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMORY));
+      options.append(" --driver-memory ").append(properties.get(SPARK_DRIVER_MEMORY));
     }
     if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) {
-      options.append(" --proxy-user " + userName);
+      options.append(" --proxy-user ").append(userName);
     }
-    options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
+    options.append(" --conf spark.kubernetes.namespace=").append(getNamespace());
     options.append(" --conf spark.executor.instances=1");
-    options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
-    options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
+    options.append(" --conf spark.kubernetes.driver.pod.name=").append(getPodName());
+    options.append(" --conf spark.kubernetes.container.image=").append(sparkImage);
     options.append(" --conf spark.driver.bindAddress=0.0.0.0");
-    options.append(" --conf spark.driver.host=" + getInterpreterPodDnsName());
-    options.append(" --conf spark.driver.port=" + String.format("%d", getSparkDriverPort()));
-    options.append(" --conf spark.blockManager.port=" + String.format("%d", getSparkBlockmanagerPort()));
+    options.append(" --conf spark.driver.host=").append(getInterpreterPodDnsName());
+    options.append(" --conf spark.driver.port=").append(getSparkDriverPort());
+    options.append(" --conf spark.blockManager.port=").append(getSparkBlockManagerPort());
 
     return options.toString();
   }
@@ -399,7 +399,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
 
   /**
    * See xxx-interpreter-pod.yaml
-   * @return
+   * @return SparkDriverPort
    */
   @VisibleForTesting
   int getSparkDriverPort() {
@@ -408,10 +408,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
 
   /**
    * See xxx-interpreter-pod.yaml
-   * @return
+   * @return Spark block manager port
    */
   @VisibleForTesting
-  int getSparkBlockmanagerPort() {
+  int getSparkBlockManagerPort() {
     return 22322;
   }
 
@@ -433,10 +433,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
     if (portForward) {
       LOGGER.info("Starting Port Forwarding");
       try {
-        int localforwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+        int localForwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
         localPortForward = client.pods().inNamespace(namespace).withName(podName)
-            .portForward(K8S_INTERPRETER_SERVICE_PORT, localforwardedPodPort);
-        super.processStarted(localforwardedPodPort, "localhost");
+            .portForward(K8S_INTERPRETER_SERVICE_PORT, localForwardedPodPort);
+        super.processStarted(localForwardedPodPort, "localhost");
       } catch (IOException e) {
         LOGGER.error("Unable to create a PortForward", e);
       }
@@ -452,6 +452,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess
 
   @Override
   public String getErrorMessage() {
-    return String.format("%s%ncurrent PodPhase: %s", super.getErrorMessage(), getPodPhase());
+    return String.format("%s%n current PodPhase: %s", super.getErrorMessage(), getPodPhase());
   }
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index d76727b..bd620fe 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -45,7 +45,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
   private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
   private final KubernetesClient client;
 
-  public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
+  public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
     super(zConf, recoveryStorage);
     client = new DefaultKubernetesClient();
   }
@@ -59,7 +59,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * to use K8sStandardInterpreterLauncher. This is useful for development. It allows Zeppelin server running on your
    * IDE and creates your interpreters in Kubernetes. So any code changes on Zeppelin server or kubernetes yaml spec
    * can be applied without re-building docker image.
-   * @return
+   * @return true, if running on K8s
    */
   boolean isRunningOnKubernetes() {
     return new File("/var/run/secrets/kubernetes.io").exists();
@@ -67,7 +67,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
 
   /**
    * Get current namespace
-   * @throws IOException
+   * @throws IOException if namespace file could not be read
    */
   String getNamespace() throws IOException {
     if (isRunningOnKubernetes()) {
@@ -78,8 +78,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
   }
 
   /**
-   * Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes
-   * @return
+   * @return Get hostname. It should be the same to Service name (and Pod name) of the Kubernetes or
+   * localhost in case of an UnknownHostException
    */
   String getHostname() {
     try {
@@ -90,9 +90,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
   }
 
   /**
-   * get Zeppelin service.
-   * return <service-name>.<namespace>.svc
-   * @throws IOException
+   * @return get Zeppelin service. <service-name>.<namespace>.svc
+   * @throws IOException if the Zeppelin service cannot be generated
    */
   private String getZeppelinService(InterpreterLaunchContext context) throws IOException {
     if (isRunningOnKubernetes()) {
@@ -122,7 +121,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * Interpreter Process will run in K8s. There is no point in changing the user after starting the container.
    * Switching to an other user (non-privileged) should be done during the image creation process.
    *
-   * Only if a spark interpreter process is running, userImpersonatation should be possible for --proxy-user
+   * Only if a spark interpreter process is running, user impersonation should be possible for --proxy-user
    */
   private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) {
       return zConf.getZeppelinImpersonateSparkProxyUser() &&
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index dbc4555..eafad5a 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -78,7 +78,7 @@ public class K8sRemoteInterpreterProcessTest {
     // when change those values, update the yaml file as well.
     assertEquals("12321:12321", intp.getInterpreterPortRange());
     assertEquals(22321, intp.getSparkDriverPort());
-    assertEquals(22322, intp.getSparkBlockmanagerPort());
+    assertEquals(22322, intp.getSparkBlockManagerPort());
   }
 
   @Test
@@ -178,7 +178,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
-    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
     assertFalse(sparkSubmitOptions.contains("--proxy-user"));
     assertTrue(intp.isSpark());
   }
@@ -229,7 +229,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
-    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
+    assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockManagerPort()));
     assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
     assertTrue(intp.isSpark());
   }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
index aa08352..735800b 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/PodPhaseWatcherTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.StringUtils;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -41,6 +42,7 @@ public class PodPhaseWatcherTest {
   public KubernetesServer server = new KubernetesServer(false, true);
 
   @Test
+  @Ignore("Reamer - ZEPPELIN-5403")
   public void testPhase() throws InterruptedException {
     KubernetesClient client = server.getClient();
     // CREATE
@@ -71,6 +73,7 @@ public class PodPhaseWatcherTest {
   }
 
   @Test
+  @Ignore("Reamer - ZEPPELIN-5403")
   public void testPhaseWithError() throws InterruptedException {
     KubernetesClient client = server.getClient();
     // CREATE