You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2021/02/26 12:55:24 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5259] Use RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new dff554c  [ZEPPELIN-5259] Use RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess
dff554c is described below

commit dff554ccf9dc023a69c4ab082ea73708981a9ef8
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Tue Feb 16 18:34:40 2021 +0100

    [ZEPPELIN-5259] Use RemoteInterpreterManagedProcess for K8sRemoteInterpreterProcess
    
    ### What is this PR for?
    This PR changes the parent class for K8sRemoteInterpreterProcess to RemoteInterpreterManagedProcess.
    It also contains some bug fixes.
    
    I removed the static method `RemoteInterpreterUtils.checkIfRemoteEndpointAccessible`, because this function is unnecessary and it also blocks testing. It is sufficient to listen for `processStarted`. This signals a successful start of the interpreter.
    
    ### What type of PR is it?
    Improvement and Bugfixing
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5259
    
    ### How should this be tested?
    * CI
    * Port forwarding was tested locally
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #4060 from Reamer/K8s_remote_interpreter_manager and squashes the following commits:
    
    26bed4515 [Philipp Dallig] Remove RemoteInterpreterUtils.checkIfRemoteEndpointAccessible and add Kube-Mock tests
    ec2843497 [Philipp Dallig] Rewrite port-forward
    b6233e264 [Philipp Dallig] Remove duplicate code
    8e7ed67ba [Philipp Dallig] Remove context as a local varaible
    0051b878a [Philipp Dallig] Use RemoteInterpreterManagedProcess instead of RemoteInterpreterProcess
    f010075cb [Philipp Dallig] Move pod random suffix to K8sUtils
    
    (cherry picked from commit 7bf1f248733a74a2ca674f17ce5165de214bba68)
    Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
 .../launcher/K8sRemoteInterpreterProcess.java      | 178 ++++++---------
 .../launcher/K8sStandardInterpreterLauncher.java   |  10 +-
 .../zeppelin/interpreter/launcher/K8sUtils.java    |  14 ++
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 249 ++++++++++++++++++---
 .../test/resources/k8s-specs/interpreter-spec.yaml | 162 ++++++++++++++
 5 files changed, 459 insertions(+), 154 deletions(-)

diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 668343c..e4b39ae 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -25,13 +25,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,31 +47,26 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.LocalPortForward;
 import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable;
 
-public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
+public class K8sRemoteInterpreterProcess extends RemoteInterpreterManagedProcess {
   private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
   private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
   private final KubernetesClient client;
   private final String namespace;
-  private final String interpreterGroupId;
   private final String interpreterGroupName;
-  private final String interpreterSettingName;
   private final File specTemplates;
   private final String containerImage;
   private final Properties properties;
-  private final Map<String, String> envs;
 
   private final String podName;
-  private final boolean portForward;
   private final String sparkImage;
+
+  // Pod Forward
+  private final boolean portForward;
   private LocalPortForward localPortForward;
-  private int podPort = K8S_INTERPRETER_SERVICE_PORT;
-  private String errorMessage;
 
-  private final boolean isUserImpersonatedForSpark;
   private final boolean timeoutDuringPending;
 
   private AtomicBoolean started = new AtomicBoolean(false);
-  private Random rand = new Random();
 
   private static final String SPARK_DRIVER_MEMORY = "spark.driver.memory";
   private static final String SPARK_DRIVER_MEMORY_OVERHEAD = "spark.driver.memoryOverhead";
@@ -99,24 +93,29 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           boolean isUserImpersonatedForSpark,
           boolean timeoutDuringPending
   ) {
-    super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort);
+    super(intpEventServerPort,
+          intpEventServerHost,
+          String.format("%d:%d", K8S_INTERPRETER_SERVICE_PORT, K8S_INTERPRETER_SERVICE_PORT),
+          "${ZEPPELIN_HOME}/interpreter/" + interpreterGroupName,
+          "/tmp/local-repo",
+          envs,
+          connectTimeout,
+          connectionPoolSize,
+          interpreterSettingName,
+          interpreterGroupId,
+          isUserImpersonatedForSpark);
     this.client = client;
     this.namespace = namespace;
     this.specTemplates = specTemplates;
     this.containerImage = containerImage;
-    this.interpreterGroupId = interpreterGroupId;
     this.interpreterGroupName = interpreterGroupName;
-    this.interpreterSettingName = interpreterSettingName;
     this.properties = properties;
-    this.envs = new HashMap<>(envs);
     this.portForward = portForward;
     this.sparkImage = sparkImage;
-    this.podName = interpreterGroupName.toLowerCase() + "-" + getRandomString(6);
-    this.isUserImpersonatedForSpark = isUserImpersonatedForSpark;
+    this.podName = interpreterGroupName.toLowerCase() + "-" + K8sUtils.getRandomPodSuffix(6);
     this.timeoutDuringPending = timeoutDuringPending;
   }
 
-
   /**
    * Get interpreter pod name
    * @return
@@ -134,35 +133,21 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   }
 
   @Override
-  public String getInterpreterGroupId() {
-    return interpreterGroupId;
-  }
-
-  @Override
-  public String getInterpreterSettingName() {
-    return interpreterSettingName;
-  }
-
-  @Override
   public void start(String userName) throws IOException {
 
     Properties templateProperties = getTemplateBindings(userName);
     // create new pod
     apply(specTemplates, false, templateProperties);
 
-    if (portForward) {
-      podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort);
-    }
-
     // special handling if we doesn't want timeout the process during lifecycle phase pending
-    if (!timeoutDuringPending) {
-      while ("pending".equalsIgnoreCase(getPodPhase()) && !Thread.currentThread().isInterrupted()) {
+    if (timeoutDuringPending) {
+      while (!StringUtils.equalsAnyIgnoreCase(getPodPhase(), "Succeeded", "Failed", "Running")
+          && !Thread.currentThread().isInterrupted()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
           LOGGER.error("Interrupt received during pending phase. Try to stop the interpreter and interrupt the current thread.", e);
-          errorMessage = "Start process was interrupted during the pending phase";
+          processStopped("Start process was interrupted during the pending phase");
           stop();
           Thread.currentThread().interrupt();
         }
@@ -177,7 +162,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       while (!started.get() && !Thread.currentThread().isInterrupted()) {
         long timetoTimeout = timeoutTime - System.currentTimeMillis();
         if (timetoTimeout <= 0) {
-          errorMessage = "The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase();
+          processStopped("The start process was aborted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
           stop();
           throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
         }
@@ -185,33 +170,17 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           started.wait(timetoTimeout);
         } catch (InterruptedException e) {
           LOGGER.error("Interrupt received during started wait. Try to stop the interpreter and interrupt the current thread.", e);
-          errorMessage = "The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase();
+          processStopped("The start process was interrupted while waiting for the interpreter to start. PodPhase before stop: " + getPodPhase());
           stop();
           Thread.currentThread().interrupt();
         }
       }
     }
-
-    // waits for interpreter thrift rpc server ready
-    while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()) && !Thread.currentThread().isInterrupted()) {
-      if (System.currentTimeMillis() - timeoutTime > 0) {
-        errorMessage = "The start process was aborted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase();
-        stop();
-        throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        LOGGER.error("Interrupt received during remote endpoint accessible check. Try to stop the interpreter and interrupt the current thread.", e);
-        errorMessage = "The start process was interrupted while waiting for the accessibility check of the remote end point. PodPhase before stop: " + getPodPhase();
-        stop();
-        Thread.currentThread().interrupt();
-      }
-    }
   }
 
   @Override
   public void stop() {
+    super.stop();
     Properties templateProperties = getTemplateBindings(null);
     // delete pod
     try {
@@ -219,54 +188,32 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     } catch (IOException e) {
       LOGGER.info("Error on removing interpreter pod", e);
     }
-    if (portForward) {
-        try {
-            localPortForward.close();
-        } catch (IOException e) {
-            LOGGER.info("Error on closing portforwarder", e);
-        }
-    }
-    // Shutdown connection
-    shutdown();
-  }
-
-  @Override
-  public String getHost() {
-    if (portForward) {
-      return "localhost";
-    } else {
-      return getInterpreterPodDnsName();
+    if (portForward && localPortForward != null) {
+      LOGGER.info("Stopping Port Forwarding");
+      try {
+        localPortForward.close();
+      } catch (IOException e) {
+        LOGGER.info("Error on closing portforwarder", e);
+      }
     }
   }
 
   @Override
-  public int getPort() {
-    return podPort;
+  public boolean isRunning() {
+    return "Running".equalsIgnoreCase(getPodPhase()) && started.get();
   }
 
-  @Override
-  public boolean isRunning() {
+  public String getPodPhase() {
     try {
-      if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
-        return true;
-      }
       Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
       if (pod != null) {
         PodStatus status = pod.getStatus();
         if (status != null) {
-          return "Running".equals(status.getPhase()) && started.get();
+          return status.getPhase();
         }
       }
     } catch (Exception e) {
-      LOGGER.error("Can't get pod status", e);
-    }
-    return false;
-  }
-
-  public String getPodPhase() {
-    Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
-    if (pod != null) {
-      return pod.getStatus().getPhase();
+      LOGGER.error("Can't get pod phase", e);
     }
     return "Unknown";
   }
@@ -315,29 +262,30 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
     k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
     k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
-    k8sProperties.put("zeppelin.k8s.interpreter.group.id", interpreterGroupId);
+    k8sProperties.put("zeppelin.k8s.interpreter.group.id", getInterpreterGroupId());
     k8sProperties.put("zeppelin.k8s.interpreter.group.name", interpreterGroupName);
-    k8sProperties.put("zeppelin.k8s.interpreter.setting.name", interpreterSettingName);
-    k8sProperties.put("zeppelin.k8s.interpreter.localRepo", "/tmp/local-repo");
-    k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", String.format("%d:%d", getPort(), getPort()));
+    k8sProperties.put("zeppelin.k8s.interpreter.setting.name", getInterpreterSettingName());
+    k8sProperties.put("zeppelin.k8s.interpreter.localRepo", getLocalRepoDir());
+    k8sProperties.put("zeppelin.k8s.interpreter.rpc.portRange", getInterpreterPortRange());
     k8sProperties.put("zeppelin.k8s.server.rpc.service", intpEventServerHost);
     k8sProperties.put("zeppelin.k8s.server.rpc.portRange", intpEventServerPort);
     if (ownerUID() != null && ownerName() != null) {
       k8sProperties.put("zeppelin.k8s.server.uid", ownerUID());
       k8sProperties.put("zeppelin.k8s.server.pod.name", ownerName());
     }
-
+    Map<String, String> k8sEnv = new HashMap<>(getEnv());
     // environment variables
-    envs.put(ENV_SERVICE_DOMAIN, envs.getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
-    envs.put(ENV_ZEPPELIN_HOME, envs.getOrDefault(ENV_ZEPPELIN_HOME, System.getenv(ENV_ZEPPELIN_HOME)));
+    k8sEnv.put(ENV_SERVICE_DOMAIN, getEnv().getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
+    k8sEnv.put(ENV_ZEPPELIN_HOME, getEnv().getOrDefault(ENV_ZEPPELIN_HOME, System.getenv(ENV_ZEPPELIN_HOME)));
 
     if (isSpark()) {
       int webUiPort = 4040;
       k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);
       if (isSparkOnKubernetes(properties)) {
-        envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
+        k8sEnv.put("SPARK_SUBMIT_OPTIONS",
+            getEnv().getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
       }
-      envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
+      k8sEnv.put("SPARK_HOME", getEnv().getOrDefault("SPARK_HOME", "/spark"));
 
       // configure interpreter property "zeppelin.spark.uiWebUrl" if not defined, to enable spark ui through reverse proxy
       String webUrl = (String) properties.get("zeppelin.spark.uiWebUrl");
@@ -349,7 +297,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
               webUrl,
               webUiPort,
               getPodName(),
-              envs.get(ENV_SERVICE_DOMAIN)
+              k8sEnv.get(ENV_SERVICE_DOMAIN)
           ));
       // Resources of Interpreter Pod
       if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
@@ -367,7 +315,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       }
     }
 
-    k8sProperties.put("zeppelin.k8s.envs", envs);
+    k8sProperties.put("zeppelin.k8s.envs", k8sEnv);
 
     // interpreter properties overrides the values
     k8sProperties.putAll(Maps.fromProperties(properties));
@@ -411,7 +359,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     if (properties.containsKey(SPARK_DRIVER_MEMORY)) {
       options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMORY));
     }
-    if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
+    if (isUserImpersonated() && !StringUtils.containsIgnoreCase(userName, "anonymous")) {
       options.append(" --proxy-user " + userName);
     }
     options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
@@ -463,20 +411,22 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     return System.getenv("POD_NAME");
   }
 
-  private String getRandomString(int length) {
-    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
-
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < length; i++) {
-      char c = chars[rand.nextInt(chars.length)];
-      sb.append(c);
-    }
-    return sb.toString();
-  }
-
   @Override
   public void processStarted(int port, String host) {
-    LOGGER.info("Interpreter pod created {}:{}", host, port);
+    if (portForward) {
+      LOGGER.info("Starting Port Forwarding");
+      try {
+        int localforwardedPodPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+        localPortForward = client.pods().inNamespace(namespace).withName(podName)
+            .portForward(K8S_INTERPRETER_SERVICE_PORT, localforwardedPodPort);
+        super.processStarted(localforwardedPodPort, "localhost");
+      } catch (IOException e) {
+        LOGGER.error("Unable to create a PortForward", e);
+      }
+    } else {
+      super.processStarted(port, getInterpreterPodDnsName());
+    }
+    LOGGER.info("Interpreter pod created {}:{}", getHost(), getPort());
     synchronized (started) {
       started.set(true);
       started.notifyAll();
@@ -485,6 +435,6 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
   @Override
   public String getErrorMessage() {
-    return String.format("%s%ncurrent PodPhase: %s", errorMessage, getPodPhase());
+    return String.format("%s%ncurrent PodPhase: %s", super.getErrorMessage(), getPodPhase());
   }
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index 289bae3..d76727b 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -43,7 +43,6 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
-  private InterpreterLaunchContext context;
   private final KubernetesClient client;
 
   public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
@@ -95,7 +94,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * return <service-name>.<namespace>.svc
    * @throws IOException
    */
-  private String getZeppelinService() throws IOException {
+  private String getZeppelinService(InterpreterLaunchContext context) throws IOException {
     if (isRunningOnKubernetes()) {
       return String.format("%s.%s.svc",
               zConf.getK8sServiceName(),
@@ -109,7 +108,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    * get Zeppelin server rpc port
    * Read env variable "<HOSTNAME>_SERVICE_PORT_RPC"
    */
-  private int getZeppelinServiceRpcPort() {
+  private int getZeppelinServiceRpcPort(InterpreterLaunchContext context) {
     String envServicePort = System.getenv(
             String.format("%s_SERVICE_PORT_RPC", getHostname().replaceAll("[-.]", "_").toUpperCase()));
     if (envServicePort != null) {
@@ -134,7 +133,6 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
   @Override
   public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws IOException {
     LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
-    this.context = context;
     this.properties = context.getProperties();
 
     return new K8sRemoteInterpreterProcess(
@@ -147,8 +145,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
             context.getInterpreterSettingName(),
             properties,
             buildEnvFromProperties(context),
-            getZeppelinService(),
-            getZeppelinServiceRpcPort(),
+            getZeppelinService(context),
+            getZeppelinServiceRpcPort(context),
             zConf.getK8sPortForward(),
             zConf.getK8sSparkContainerImage(),
             getConnectTimeout(),
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
index dd0e05f..e09c517 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -31,6 +32,8 @@ public class K8sUtils {
   private static final long T = G * K;
   private static final long MINIMUM_OVERHEAD = 384;
 
+  private static final Random rand = new Random();
+
   private K8sUtils() {
     // do nothing
   }
@@ -74,4 +77,15 @@ public class K8sUtils {
     }
     return memoryAmountBytes;
   }
+
+  public static String getRandomPodSuffix(int length) {
+    char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < length; i++) {
+      char c = chars[rand.nextInt(chars.length)];
+      sb.append(c);
+    }
+    return sb.toString();
+  }
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 95cd604..a8e9b30 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -19,16 +19,27 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.junit.Rule;
 import org.junit.Test;
-
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
 
 public class K8sRemoteInterpreterProcessTest {
@@ -37,36 +48,6 @@ public class K8sRemoteInterpreterProcessTest {
   public KubernetesServer server = new KubernetesServer(true, true);
 
   @Test
-  public void testGetHostPort() {
-    // given
-    Properties properties = new Properties();
-    HashMap<String, String> envs = new HashMap<String, String>();
-
-    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        server.getClient(),
-        "default",
-        new File(".skip"),
-        "interpreter-container:1.0",
-        "shared_process",
-        "sh",
-        "shell",
-        properties,
-        envs,
-        "zeppelin.server.hostname",
-        12320,
-        false,
-        "spark-container:1.0",
-        10,
-        10,
-        false,
-        false);
-
-    // then
-    assertEquals(String.format("%s.%s.svc", intp.getPodName(), "default"), intp.getHost());
-    assertEquals(12321, intp.getPort());
-  }
-
-  @Test
   public void testPredefinedPortNumbers() {
     // given
     Properties properties = new Properties();
@@ -94,7 +75,7 @@ public class K8sRemoteInterpreterProcessTest {
 
     // following values are hardcoded in k8s/interpreter/100-interpreter.yaml.
     // when change those values, update the yaml file as well.
-    assertEquals(12321, intp.getPort());
+    assertEquals("12321:12321", intp.getInterpreterPortRange());
     assertEquals(22321, intp.getSparkDriverPort());
     assertEquals(22322, intp.getSparkBlockmanagerPort());
   }
@@ -194,7 +175,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
     assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
     assertFalse(sparkSubmitOptions.contains("--proxy-user"));
@@ -245,7 +226,7 @@ public class K8sRemoteInterpreterProcessTest {
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
-    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
+    assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getPodName() + ".default.svc"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.port=" + intp.getSparkDriverPort()));
     assertTrue(sparkSubmitOptions.contains("spark.blockManager.port=" + intp.getSparkBlockmanagerPort()));
     assertTrue(sparkSubmitOptions.contains("--proxy-user mytestUser"));
@@ -412,4 +393,204 @@ public class K8sRemoteInterpreterProcessTest {
     assertEquals("1280Mi", p.get("zeppelin.k8s.interpreter.memory"));
   }
 
+  @Test
+  public void testK8sStartSuccessful() throws IOException, InterruptedException {
+    // given
+    Properties properties = new Properties();
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("SERVICE_DOMAIN", "mydomain");
+    URL url = Thread.currentThread().getContextClassLoader()
+        .getResource("k8s-specs/interpreter-spec.yaml");
+    File file = new File(url.getPath());
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        server.getClient(),
+        "default",
+        file,
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.service",
+        12320,
+        false,
+        "spark-container:1.0",
+        10000,
+        10,
+        false,
+        true);
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service
+        .submit(new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp));
+    intp.start("TestUser");
+    // then
+    assertEquals("Running", intp.getPodPhase());
+  }
+
+  @Test
+  public void testK8sStartFailed() throws IOException, InterruptedException {
+    // given
+    Properties properties = new Properties();
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("SERVICE_DOMAIN", "mydomain");
+    URL url = Thread.currentThread().getContextClassLoader()
+        .getResource("k8s-specs/interpreter-spec.yaml");
+    File file = new File(url.getPath());
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        server.getClient(),
+        "default",
+        file,
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.service",
+        12320,
+        false,
+        "spark-container:1.0",
+        3000,
+        10,
+        false,
+        true);
+    PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
+    podStatusSimulator.setSecondPhase("Failed");
+    podStatusSimulator.setSuccessfullStart(false);
+    ExecutorService service = Executors.newFixedThreadPool(1);
+    service
+        .submit(podStatusSimulator);
+    // should throw an IOException
+    try {
+      intp.start("TestUser");
+      fail("We excepting an IOException");
+    } catch (IOException e) {
+      assertNotNull(e);
+      // Check that the Pod is deleted
+      assertNull(
+          server.getClient().pods().inNamespace(intp.getNamespace()).withName(intp.getPodName())
+              .get());
+    }
+  }
+
+  @Test
+  public void testK8sStartTimeoutPending() throws IOException, InterruptedException {
+    // given
+    Properties properties = new Properties();
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("SERVICE_DOMAIN", "mydomain");
+    URL url = Thread.currentThread().getContextClassLoader()
+        .getResource("k8s-specs/interpreter-spec.yaml");
+    File file = new File(url.getPath());
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        server.getClient(),
+        "default",
+        file,
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.service",
+        12320,
+        false,
+        "spark-container:1.0",
+        3000,
+        10,
+        false,
+        true);
+    PodStatusSimulator podStatusSimulator = new PodStatusSimulator(server.getClient(), intp.getNamespace(), intp.getPodName(), intp);
+    podStatusSimulator.setFirstPhase("Pending");
+    podStatusSimulator.setSecondPhase("Pending");
+    podStatusSimulator.setSuccessfullStart(false);
+    ExecutorService service = Executors.newFixedThreadPool(2);
+    service
+        .submit(podStatusSimulator);
+    service.submit(() -> {
+      try {
+        intp.start("TestUser");
+        fail("We interrupt, this line of code should not be executed.");
+      } catch (IOException e) {
+        fail("We interrupt, this line of code should not be executed.");
+      }
+    });
+    // wait a little bit
+    TimeUnit.SECONDS.sleep(5);
+    service.shutdownNow();
+    // wait for a shutdown
+    service.awaitTermination(10, TimeUnit.SECONDS);
+    // Check that the Pod is deleted
+    assertNull(server.getClient().pods().inNamespace(intp.getNamespace())
+        .withName(intp.getPodName()).get());
+
+  }
+
+  class PodStatusSimulator implements Runnable {
+
+    private final KubernetesClient client;
+    private final String namespace;
+    private final String podName;
+    private final RemoteInterpreterManagedProcess process;
+
+    private String firstPhase = "Pending";
+    private String secondPhase = "Running";
+    private boolean successfulStart = true;
+
+    public PodStatusSimulator(
+        KubernetesClient client,
+        String namespace,
+        String podName,
+        RemoteInterpreterManagedProcess process) {
+      this.client = client;
+      this.namespace = namespace;
+      this.podName = podName;
+      this.process = process;
+    }
+
+    public void setFirstPhase(String phase) {
+      this.firstPhase = phase;
+    }
+    public void setSecondPhase(String phase) {
+      this.secondPhase = phase;
+    }
+    public void setSuccessfullStart(boolean successful) {
+      this.successfulStart = successful;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Instant timeoutTime = Instant.now().plusSeconds(10);
+        while (timeoutTime.isAfter(Instant.now())) {
+          Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+          if (pod != null) {
+            TimeUnit.SECONDS.sleep(1);
+            // Update Pod to "pending" phase
+            pod.setStatus(new PodStatus(null, null, null, null, null, null, null, firstPhase,
+                null,
+                null, null, null, null));
+            client.pods().inNamespace(namespace).updateStatus(pod);
+            // Update Pod to "Running" phase
+            pod.setStatus(new PodStatus(null, null, null, null, null, null, null, secondPhase,
+                null,
+                null, null, null, null));
+            client.pods().inNamespace(namespace).updateStatus(pod);
+            TimeUnit.SECONDS.sleep(1);
+            if (successfulStart) {
+              process.processStarted(12320, "testing");
+            }
+            break;
+          } else {
+            TimeUnit.MILLISECONDS.sleep(100);
+          }
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
new file mode 100644
index 0000000..116b0df
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/resources/k8s-specs/interpreter-spec.yaml
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+kind: Pod
+apiVersion: v1
+metadata:
+  namespace: {{zeppelin.k8s.namespace}}
+  name: {{zeppelin.k8s.interpreter.pod.name}}
+  labels:
+    app: {{zeppelin.k8s.interpreter.pod.name}}
+    interpreterGroupId: {{zeppelin.k8s.interpreter.group.id}}
+    interpreterSettingName: {{zeppelin.k8s.interpreter.setting.name}}
+  {% if zeppelin.k8s.server.uid is defined %}
+  ownerReferences:
+  - apiVersion: v1
+    controller: false
+    blockOwnerDeletion: false
+    kind: Pod
+    name: {{zeppelin.k8s.server.pod.name}}
+    uid: {{zeppelin.k8s.server.uid}}
+  {% endif %}
+spec:
+  {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+  automountServiceAccountToken: true
+  {% else %}
+  automountServiceAccountToken: false
+  {% endif %}
+  restartPolicy: Never
+  terminationGracePeriodSeconds: 30
+  containers:
+  - name: {{zeppelin.k8s.interpreter.container.name}}
+    image: {{zeppelin.k8s.interpreter.container.image}}
+    args:
+      - "$(ZEPPELIN_HOME)/bin/interpreter.sh"
+      - "-d"
+      - "$(ZEPPELIN_HOME)/interpreter/{{zeppelin.k8s.interpreter.group.name}}"
+      - "-r"
+      - "{{zeppelin.k8s.interpreter.rpc.portRange}}"
+      - "-c"
+      - "{{zeppelin.k8s.server.rpc.service}}"
+      - "-p"
+      - "{{zeppelin.k8s.server.rpc.portRange}}"
+      - "-i"
+      - "{{zeppelin.k8s.interpreter.group.id}}"
+      - "-l"
+      - "{{zeppelin.k8s.interpreter.localRepo}}/{{zeppelin.k8s.interpreter.setting.name}}"
+      - "-g"
+      - "{{zeppelin.k8s.interpreter.setting.name}}"
+    env:
+  {% for key, value in zeppelin.k8s.envs.items() %}
+    - name: {{key}}
+      value: {{value}}
+  {% endfor %}
+  {% if zeppelin.k8s.interpreter.cores is defined and zeppelin.k8s.interpreter.memory is defined %}
+    resources:
+      requests:
+        memory: "{{zeppelin.k8s.interpreter.memory}}"
+        cpu: "{{zeppelin.k8s.interpreter.cores}}"
+{# limits.memory is not set because of a potential OOM-Killer. https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits #}
+      limits:
+        cpu: "{{zeppelin.k8s.interpreter.cores}}"
+  {% endif %}
+  {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+    volumeMounts:
+    - name: spark-home
+      mountPath: /spark
+  initContainers:
+  - name: spark-home-init
+    image: {{zeppelin.k8s.spark.container.image}}
+    command: ["sh", "-c", "cp -r /opt/spark/* /spark/"]
+    volumeMounts:
+    - name: spark-home
+      mountPath: /spark
+  volumes:
+  - name: spark-home
+    emptyDir: {}
+  {% endif %}
+---
+kind: Service
+apiVersion: v1
+metadata:
+  namespace: {{zeppelin.k8s.namespace}}
+  name: {{zeppelin.k8s.interpreter.pod.name}}             # keep Service name the same to Pod name.
+  {% if zeppelin.k8s.server.uid is defined %}
+  ownerReferences:
+  - apiVersion: v1
+    controller: false
+    blockOwnerDeletion: false
+    kind: Pod
+    name: {{zeppelin.k8s.server.pod.name}}
+    uid: {{zeppelin.k8s.server.uid}}
+  {% endif %}
+spec:
+  clusterIP: None
+  ports:
+    - name: intp
+      port: 12321
+    {% if zeppelin.k8s.interpreter.group.name == "spark" %}
+    - name: spark-driver
+      port: 22321
+    - name: spark-blockmanager
+      port: 22322
+    - name: spark-ui
+      port: 4040
+    {% endif %}
+  selector:
+    app: {{zeppelin.k8s.interpreter.pod.name}}
+{% if zeppelin.k8s.interpreter.group.name == "spark" %}
+---
+kind: Role
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: {{zeppelin.k8s.interpreter.pod.name}}
+  namespace: {{zeppelin.k8s.namespace}}
+  {% if zeppelin.k8s.server.uid is defined %}
+  ownerReferences:
+  - apiVersion: v1
+    controller: false
+    blockOwnerDeletion: false
+    kind: Pod
+    name: {{zeppelin.k8s.server.pod.name}}
+    uid: {{zeppelin.k8s.server.uid}}
+  {% endif %}
+rules:
+- apiGroups: [""]
+  resources: ["pods", "services"]
+  verbs: ["create", "get", "update", "list", "delete", "watch" ]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: {{zeppelin.k8s.interpreter.pod.name}}
+  {% if zeppelin.k8s.server.uid is defined %}
+  ownerReferences:
+  - apiVersion: v1
+    controller: false
+    blockOwnerDeletion: false
+    kind: Pod
+    name: {{zeppelin.k8s.server.pod.name}}
+    uid: {{zeppelin.k8s.server.uid}}
+  {% endif %}
+subjects:
+- kind: ServiceAccount
+  name: default
+roleRef:
+  kind: Role
+  name: {{zeppelin.k8s.interpreter.pod.name}}
+  apiGroup: rbac.authorization.k8s.io
+{% endif %}
\ No newline at end of file