You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/07/13 15:07:04 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4915] K8s with java lib

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 8af054e  [ZEPPELIN-4915] K8s with java lib
8af054e is described below

commit 8af054e851a5f3cae5238a311600279d630354ed
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Jul 8 18:21:37 2020 +0200

    [ZEPPELIN-4915] K8s with java lib
    
    ### What is this PR for?
    This PR changes from a Kubectl binary to a Java library for connecting to a Kubernetes cluster.
    
    I decided to use [fabric8/kubernetes-client](https://github.com/fabric8io/kubernetes-client), because it's also used by the [Apache Spark project](https://github.com/fabric8io/kubernetes-client#who-uses-kubernetes--openshift-java-client) and allows the import of our generic files rendered with jinja2.
    
    ### What type of PR is it?
    - Improvement
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4915
    
    ### How should this be tested?
    * Travis-CI: https://travis-ci.org/github/Reamer/zeppelin/builds/706228986
    
    ### Questions:
    * Does the licenses files need update? Yes
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #3825 from Reamer/k8s_with_java_lib and squashes the following commits:
    
    2cb46a7c4 [Philipp Dallig] Describe how an Auth-Token can be provided for communication with K8s
    c6a7cda78 [Philipp Dallig] Added the ability to use a custom namespace for development
    1f50b3971 [Philipp Dallig] Correct rendering of template
    495c90bb1 [Philipp Dallig] accept the proposals of the review
    37acc5e25 [Philipp Dallig] Add license for kubernetes-client
    a95bd1988 [Philipp Dallig] Some cleanup
    827cb5df4 [Philipp Dallig] Rewrite thread handling for K8s Launcher
    143a5b5ec [Philipp Dallig] Switch from kubectl binary to java library
    ce9fce12a [Philipp Dallig] Update jinja to a new version
    bdbcec83f [Philipp Dallig] Add LstripBlock and TrimBlocks in Jinja rendering
    c4906f27b [Philipp Dallig] cleanup RemoteInterpreterUtils
    
    (cherry picked from commit b70a1b81b4fe1bea81313721f3ab1abcab0d7929)
    Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
 LICENSE                                            |   1 +
 docs/quickstart/kubernetes.md                      |   3 +-
 scripts/docker/zeppelin/bin/Dockerfile             |   7 -
 .../zeppelin/conf/ZeppelinConfiguration.java       |   6 +-
 .../interpreter/launcher/InterpreterLauncher.java  |  19 +-
 .../interpreter/remote/RemoteInterpreterUtils.java |  28 +--
 zeppelin-plugins/launcher/k8s-standard/pom.xml     |  77 ++++++++-
 .../launcher/K8sRemoteInterpreterProcess.java      | 191 ++++++++++-----------
 .../interpreter/launcher/K8sSpecTemplate.java      |  11 +-
 .../launcher/K8sStandardInterpreterLauncher.java   |  33 ++--
 .../zeppelin/interpreter/launcher/Kubectl.java     | 155 -----------------
 .../launcher/K8sRemoteInterpreterProcessTest.java  |  91 ++++------
 .../interpreter/launcher/K8sSpecTemplateTest.java  |  15 ++
 .../K8sStandardInterpreterLauncherTest.java        |  43 ++---
 .../zeppelin/interpreter/launcher/KubectlTest.java | 105 -----------
 15 files changed, 274 insertions(+), 511 deletions(-)

diff --git a/LICENSE b/LICENSE
index d6460e9..5bd3e1a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -267,6 +267,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
     (Apache 2.0) Embedded MongoDB (https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo)
     (Apache 2.0) Kotlin (https://github.com/JetBrains/kotlin)
     (Apache 2.0) s3proxy (https://github.com/gaul/s3proxy)
+    (Apache 2.0) kubernetes-client (https://github.com/fabric8io/kubernetes-client)
 
 ========================================================================
 BSD 3-Clause licenses
diff --git a/docs/quickstart/kubernetes.md b/docs/quickstart/kubernetes.md
index cd45912..2cfbbca 100644
--- a/docs/quickstart/kubernetes.md
+++ b/docs/quickstart/kubernetes.md
@@ -264,5 +264,6 @@ Zeppelin can run locally (such as inside your IDE in debug mode) and able to run
 | ZEPPELIN_K8S_PORTFORWARD | true | Enable port forwarding from local Zeppelin instance to Interpreters running on Kubernetes |
 | ZEPPELIN_K8S_CONTAINER_IMAGE | <image>:<version> | Zeppelin interpreter docker image to use |
 | ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE | <image>:<version> | Spark docker image to use |
+| ZEPPELIN_K8S_NAMESPACE | <k8s namespace> | Kubernetes namespace  to use |
+| KUBERNETES_AUTH_TOKEN | <token> | Kubernetes auth token to create resources |
 
-`kubectl` command need to be configured to connect your Kubernetes cluster.
diff --git a/scripts/docker/zeppelin/bin/Dockerfile b/scripts/docker/zeppelin/bin/Dockerfile
index 6745b46..e1bcba8 100644
--- a/scripts/docker/zeppelin/bin/Dockerfile
+++ b/scripts/docker/zeppelin/bin/Dockerfile
@@ -97,13 +97,6 @@ RUN echo "$LOG_TAG Install R related packages" && \
     R -e "install.packages('Rcpp', repos='http://cran.us.r-project.org')" && \
     Rscript -e "library('devtools'); library('Rcpp'); install_github('ramnathv/rCharts')"
 
-# Install kubectl
-RUN apt-get install -y apt-transport-https && \
-    curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - && \
-    echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" | tee -a /etc/apt/sources.list.d/kubernetes.list && \
-    apt-get update && \
-    apt-get install -y kubectl
-
 RUN echo "$LOG_TAG Cleanup" && \
     apt-get autoclean && \
     apt-get clean
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index 7331a50..112bdbf 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -828,8 +828,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     return getBoolean(ConfVars.ZEPPELIN_K8S_PORTFORWARD);
   }
 
-  public String getK8sKubectlCmd() {
-    return getString(ConfVars.ZEPPELIN_K8S_KUBECTL);
+  public String getK8sNamepsace() {
+    return getString(ConfVars.ZEPPELIN_K8S_NAMESPACE);
   }
 
   public String getK8sContainerImage() {
@@ -1032,8 +1032,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
     ZEPPELIN_RUN_MODE("zeppelin.run.mode", "auto"),              // auto | local | k8s | Docker
 
     ZEPPELIN_K8S_PORTFORWARD("zeppelin.k8s.portforward", false), // kubectl port-forward incase of Zeppelin is running outside of kuberentes
-    ZEPPELIN_K8S_KUBECTL("zeppelin.k8s.kubectl", "kubectl"),     // kubectl command
     ZEPPELIN_K8S_CONTAINER_IMAGE("zeppelin.k8s.container.image", "apache/zeppelin:" + Util.getVersion()),
+    ZEPPELIN_K8S_NAMESPACE("zeppelin.k8s.namespace", "default"), // specify a namespace incase of Zeppelin is running outside of kuberentes
     ZEPPELIN_K8S_SPARK_CONTAINER_IMAGE("zeppelin.k8s.spark.container.image", "apache/spark:latest"),
     ZEPPELIN_K8S_TEMPLATE_DIR("zeppelin.k8s.template.dir", "k8s"),
     ZEPPELIN_K8S_SERVICE_NAME("zeppelin.k8s.service.name", "zeppelin-server"),
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
index 1fb2ea9..0d90fc4 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java
@@ -17,23 +17,21 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.InterpreterRunner;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Properties;
-
 /**
  * Component to Launch interpreter process.
  */
 public abstract class InterpreterLauncher {
 
-  private static Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
-  private static String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";
+  private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class);
+  private static final String SPECIAL_CHARACTER="{}()<>&*‘|=?;[]$–#~!.\"%/\\:+,`";
 
   protected ZeppelinConfiguration zConf;
   protected Properties properties;
@@ -89,14 +87,13 @@ public abstract class InterpreterLauncher {
               recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
       if (recoveredClient != null) {
         if (recoveredClient.isRunning()) {
-          LOGGER.info("Recover interpreter process running at {} of interpreter group: {}",
-                  recoveredClient.getHost() + ":" + recoveredClient.getPort(),
+          LOGGER.info("Recover interpreter process running at {}:{} of interpreter group: {}",
+                  recoveredClient.getHost(), recoveredClient.getPort(),
                   recoveredClient.getInterpreterGroupId());
           return recoveredClient;
         } else {
           recoveryStorage.removeInterpreterClient(context.getInterpreterGroupId());
-          LOGGER.warn("Unable to recover interpreter process: " + recoveredClient.getHost() + ":"
-                  + recoveredClient.getPort() + ", as it is already terminated.");
+          LOGGER.warn("Unable to recover interpreter process: {}:{}, as it is already terminated.", recoveredClient.getHost(), recoveredClient.getPort());
         }
       }
     }
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
index 4f7c9a5..5a0a3ab 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -40,14 +39,16 @@ import java.util.Collections;
  *
  */
 public class RemoteInterpreterUtils {
-  static Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterUtils.class);
 
+  private RemoteInterpreterUtils() {
+    throw new IllegalStateException("Utility class");
+  }
 
   public static int findRandomAvailablePortOnAllLocalInterfaces() throws IOException {
     int port;
     try (ServerSocket socket = new ServerSocket(0);) {
       port = socket.getLocalPort();
-      socket.close();
     }
     return port;
   }
@@ -117,25 +118,14 @@ public class RemoteInterpreterUtils {
   }
 
   public static boolean checkIfRemoteEndpointAccessible(String host, int port) {
-    try {
-      Socket discover = new Socket();
+    try (Socket discover = new Socket()) {
       discover.setSoTimeout(1000);
       discover.connect(new InetSocketAddress(host, port), 1000);
-      discover.close();
       return true;
-    } catch (ConnectException cne) {
+    } catch (IOException e) {
       // end point is not accessible
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
-            "(might be initializing): " + cne.getMessage());
-      }
-      return false;
-    } catch (IOException ioe) {
-      // end point is not accessible
-      if (LOGGER.isDebugEnabled()) {
-        LOGGER.debug("Remote endpoint '" + host + ":" + port + "' is not accessible " +
-            "(might be initializing): " + ioe.getMessage());
-      }
+      LOGGER.debug("Remote endpoint '{}:{}' is not accessible " +
+            "(might be initializing): {}" , host, port, e.getMessage());
       return false;
     }
   }
@@ -143,7 +133,7 @@ public class RemoteInterpreterUtils {
   public static String getInterpreterSettingId(String intpGrpId) {
     String settingId = null;
     if (intpGrpId != null) {
-      int indexOfColon = intpGrpId.indexOf("-");
+      int indexOfColon = intpGrpId.indexOf('-');
       settingId = intpGrpId.substring(0, indexOfColon);
     }
     return settingId;
diff --git a/zeppelin-plugins/launcher/k8s-standard/pom.xml b/zeppelin-plugins/launcher/k8s-standard/pom.xml
index a5a385c..077408e 100644
--- a/zeppelin-plugins/launcher/k8s-standard/pom.xml
+++ b/zeppelin-plugins/launcher/k8s-standard/pom.xml
@@ -37,22 +37,50 @@
 
     <properties>
         <plugin.name>Launcher/K8sStandardInterpreterLauncher</plugin.name>
+        <kubernetes.client.version>4.10.2</kubernetes.client.version>
+        <jinjava.version>2.5.4</jinjava.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+            <version>${kubernetes.client.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
             <groupId>com.hubspot.jinjava</groupId>
             <artifactId>jinjava</artifactId>
-            <version>2.4.12</version>
+            <version>${jinjava.version}</version>
+            <scope>compile</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <!-- Test libraries -->
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>${kubernetes.client.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
-                <artifactId>maven-dependency-plugin</artifactId>
-            </plugin>
-            <plugin>
                 <artifactId>maven-enforcer-plugin</artifactId>
                 <executions>
                     <execution>
@@ -61,6 +89,47 @@
                     </execution>
                 </executions>
             </plugin>
+            <!-- Shade the whole plugin, because spark provides an other version of okio -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <relocations>
+                        <relocation>
+                            <pattern>okio</pattern>
+                            <shadedPattern>org.apache.zeppelin.shaded.okio</shadedPattern>
+                        </relocation>
+                    </relocations>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+            </plugin>
+            <!--  disable maven-dependency-plugin with execution copy-plugin-dependencies because we shade the whole dependency -->
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-plugin-dependencies</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index 5735c31..5df2f02 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -1,17 +1,16 @@
 package org.apache.zeppelin.interpreter.launcher;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.hubspot.jinjava.Jinjava;
-import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
@@ -19,10 +18,23 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.hubspot.jinjava.Jinjava;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.LocalPortForward;
+import io.fabric8.kubernetes.client.dsl.ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable;
+
 public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private static final Logger LOGGER = LoggerFactory.getLogger(K8sRemoteInterpreterProcess.class);
   private static final int K8S_INTERPRETER_SERVICE_PORT = 12321;
-  private final Kubectl kubectl;
+  private final KubernetesClient client;
+  private final String namespace;
   private final String interpreterGroupId;
   private final String interpreterGroupName;
   private final String interpreterSettingName;
@@ -31,15 +43,13 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private final Properties properties;
   private final Map<String, String> envs;
 
-  private final Gson gson = new Gson();
   private final String podName;
   private final boolean portForward;
   private final String sparkImage;
-  private ExecuteWatchdog portForwardWatchdog;
+  private LocalPortForward localPortForward;
   private int podPort = K8S_INTERPRETER_SERVICE_PORT;
 
   private final boolean isUserImpersonatedForSpark;
-  private String userName;
 
   private AtomicBoolean started = new AtomicBoolean(false);
   private Random rand = new Random();
@@ -50,7 +60,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";
 
   public K8sRemoteInterpreterProcess(
-          Kubectl kubectl,
+          KubernetesClient client,
+          String namespace,
           File specTemplates,
           String containerImage,
           String interpreterGroupId,
@@ -66,7 +77,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
           boolean isUserImpersonatedForSpark
   ) {
     super(connectTimeout, intpEventServerHost, intpEventServerPort);
-    this.kubectl = kubectl;
+    this.client = client;
+    this.namespace = namespace;
     this.specTempaltes = specTemplates;
     this.containerImage = containerImage;
     this.interpreterGroupId = interpreterGroupId;
@@ -85,11 +97,18 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
    * Get interpreter pod name
    * @return
    */
-  @VisibleForTesting
-  String getPodName() {
+  public String getPodName() {
     return podName;
   }
 
+  /**
+   * Get namespace
+   * @return
+   */
+  public String getNamespace() {
+    return namespace;
+  }
+
   @Override
   public String getInterpreterGroupId() {
     return interpreterGroupId;
@@ -102,76 +121,71 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
   @Override
   public void start(String userName) throws IOException {
-    /**
-     * If a spark interpreter process is running, userName is set in preparation for --proxy-user
-     */
-    if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
-      this.userName = userName;
-    } else {
-      this.userName = null;
-    }
+
+    Properties templateProperties = getTemplateBindings(userName);
     // create new pod
-    apply(specTempaltes, false);
-    kubectl.wait(String.format("pod/%s", getPodName()), "condition=Ready", getConnectTimeout()/1000);
+    apply(specTempaltes, false, templateProperties);
 
     if (portForward) {
       podPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      portForwardWatchdog = kubectl.portForward(
-          String.format("pod/%s", getPodName()),
-          new String[] {
-              String.format("%s:%s", podPort, K8S_INTERPRETER_SERVICE_PORT)
-          });
+      localPortForward = client.pods().inNamespace(namespace).withName(podName).portForward(K8S_INTERPRETER_SERVICE_PORT, podPort);
     }
 
     long startTime = System.currentTimeMillis();
+    long timeoutTime = startTime + getConnectTimeout();
 
     // wait until interpreter send started message through thrift rpc
     synchronized (started) {
-      if (!started.get()) {
+      while (!started.get()) {
+        long timetoTimeout = timeoutTime - System.currentTimeMillis();
+        if (timetoTimeout <= 0) {
+          stop();
+          throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
+        }
         try {
-          started.wait(getConnectTimeout());
+          started.wait(timetoTimeout);
         } catch (InterruptedException e) {
-          LOGGER.error("Remote interpreter is not accessible");
+          LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e);
+          stop();
+          Thread.currentThread().interrupt();
         }
       }
     }
 
-    if (!started.get()) {
-      LOGGER.info("Interpreter pod creation is time out in {} seconds", getConnectTimeout()/1000);
-    }
-
     // waits for interpreter thrift rpc server ready
-    while (System.currentTimeMillis() - startTime < getConnectTimeout()) {
-      if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
-        break;
-      } else {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-        }
+    while (!RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
+      if (System.currentTimeMillis() - timeoutTime > 0) {
+        stop();
+        throw new IOException("Launching zeppelin interpreter on kubernetes is time out, kill it now");
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupt received. Try to stop the interpreter and interrupt the current thread.", e);
+        stop();
+        Thread.currentThread().interrupt();
       }
     }
   }
 
   @Override
   public void stop() {
+    Properties templateProperties = getTemplateBindings(null);
     // delete pod
     try {
-      apply(specTempaltes, true);
+      apply(specTempaltes, true, templateProperties);
     } catch (IOException e) {
       LOGGER.info("Error on removing interpreter pod", e);
     }
-
-    try {
-      kubectl.wait(String.format("pod/%s", getPodName()), "delete", 60);
-    } catch (IOException e) {
-      LOGGER.debug("Error on waiting pod delete", e);
-    }
-
-
-    if (portForwardWatchdog != null) {
-      portForwardWatchdog.destroyProcess();
+    if (portForward) {
+        try {
+            localPortForward.close();
+        } catch (IOException e) {
+            LOGGER.info("Error on closing portforwarder", e);
+        }
     }
+    // Shutdown connection
+    shutdown();
   }
 
   @Override
@@ -194,40 +208,24 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort())) {
         return true;
       }
-
-      String ret = kubectl.execAndGet(new String[]{
-              "get",
-              String.format("pods/%s", getPodName()),
-              "-o",
-              "json"
-      });
-
-      if (ret == null) {
-        return false;
-      }
-
-      Map<String, Object> pod = gson.fromJson(ret, new TypeToken<Map<String, Object>>() {}.getType());
-      if (pod == null || !pod.containsKey("status")) {
-        return false;
-      }
-
-      Map<String, Object> status = (Map<String, Object>) pod.get("status");
-      if (status == null || !status.containsKey("phase")) {
-        return false;
+      Pod pod = client.pods().inNamespace(namespace).withName(podName).get();
+      if (pod != null) {
+        PodStatus status = pod.getStatus();
+        if (status != null) {
+          return "Running".equals(status.getPhase()) && started.get();
+        }
       }
-
-      return "Running".equals(status.get("phase")) && started.get();
     } catch (Exception e) {
       LOGGER.error("Can't get pod status", e);
-      return false;
     }
+    return false;
   }
 
   /**
    * Apply spec file(s) in the path.
    * @param path
    */
-  void apply(File path, boolean delete) throws IOException {
+  void apply(File path, boolean delete, Properties templateProperties) throws IOException {
     if (path.getName().startsWith(".") || path.isHidden() || path.getName().endsWith("~")) {
       LOGGER.info("Skip {}", path.getAbsolutePath());
     }
@@ -240,18 +238,19 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       }
 
       for (File f : files) {
-        apply(f, delete);
+        apply(f, delete, templateProperties);
       }
     } else if (path.isFile()) {
-      LOGGER.info("Apply {}", path.getAbsolutePath());
       K8sSpecTemplate specTemplate = new K8sSpecTemplate();
-      specTemplate.loadProperties(getTemplateBindings());
-
-      String spec = specTemplate.render(path);
+      specTemplate.loadProperties(templateProperties);
+      String template = specTemplate.render(path);
+      ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean> k8sObjects = client.load(IOUtils.toInputStream(template, StandardCharsets.UTF_8));
+      LOGGER.info("Apply {} with {} K8s Objects", path.getAbsolutePath(), k8sObjects.get().size());
+      LOGGER.debug(template);
       if (delete) {
-        kubectl.delete(spec);
+        k8sObjects.inNamespace(namespace).delete();
       } else {
-        kubectl.apply(spec);
+        k8sObjects.inNamespace(namespace).createOrReplace();
       }
     } else {
       LOGGER.error("Can't apply {}", path.getAbsolutePath());
@@ -259,11 +258,11 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   }
 
   @VisibleForTesting
-  Properties getTemplateBindings() {
+  Properties getTemplateBindings(String userName) {
     Properties k8sProperties = new Properties();
 
     // k8s template properties
-    k8sProperties.put("zeppelin.k8s.namespace", kubectl.getNamespace());
+    k8sProperties.put("zeppelin.k8s.namespace", getNamespace());
     k8sProperties.put("zeppelin.k8s.interpreter.pod.name", getPodName());
     k8sProperties.put("zeppelin.k8s.interpreter.container.name", interpreterGroupName.toLowerCase());
     k8sProperties.put("zeppelin.k8s.interpreter.container.image", containerImage);
@@ -287,7 +286,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
       int webUiPort = 4040;
       k8sProperties.put("zeppelin.k8s.spark.container.image", sparkImage);
       if (isSparkOnKubernetes(properties)) {
-        envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions());
+        envs.put("SPARK_SUBMIT_OPTIONS", envs.getOrDefault("SPARK_SUBMIT_OPTIONS", "") + buildSparkSubmitOptions(userName));
       }
       envs.put("SPARK_HOME", envs.getOrDefault("SPARK_HOME", "/spark"));
 
@@ -355,7 +354,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   }
 
   @VisibleForTesting
-  String buildSparkSubmitOptions() {
+  String buildSparkSubmitOptions(String userName) {
     StringBuilder options = new StringBuilder();
 
     options.append(" --master k8s://https://kubernetes.default.svc");
@@ -363,10 +362,10 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
       options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY));
     }
-    if (userName != null) {
+    if (isUserImpersonatedForSpark && !StringUtils.containsIgnoreCase(userName, "anonymous") && isSpark()) {
       options.append(" --proxy-user " + userName);
     }
-    options.append(" --conf spark.kubernetes.namespace=" + kubectl.getNamespace());
+    options.append(" --conf spark.kubernetes.namespace=" + getNamespace());
     options.append(" --conf spark.executor.instances=1");
     options.append(" --conf spark.kubernetes.driver.pod.name=" + getPodName());
     options.append(" --conf spark.kubernetes.container.image=" + sparkImage);
@@ -381,7 +380,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private String getInterpreterPodDnsName() {
     return String.format("%s.%s.svc",
         getPodName(), // service name and pod name is the same
-        kubectl.getNamespace());
+        getNamespace());
   }
 
   /**
@@ -431,7 +430,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     LOGGER.info("Interpreter pod created {}:{}", host, port);
     synchronized (started) {
       started.set(true);
-      started.notify();
+      started.notifyAll();
     }
   }
 
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
index 2ed2c13..5abfb48 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplate.java
@@ -16,9 +16,6 @@
  */
 package org.apache.zeppelin.interpreter.launcher;
 
-import com.hubspot.jinjava.Jinjava;
-import org.apache.commons.io.FileUtils;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -27,6 +24,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.commons.io.FileUtils;
+
+import com.hubspot.jinjava.Jinjava;
+import com.hubspot.jinjava.JinjavaConfig;
+
 public class K8sSpecTemplate extends HashMap<String, Object> {
   public String render(File templateFile) throws IOException {
     String template = FileUtils.readFileToString(templateFile, Charset.defaultCharset());
@@ -37,7 +39,8 @@ public class K8sSpecTemplate extends HashMap<String, Object> {
     ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-      Jinjava jinja = new Jinjava();
+      JinjavaConfig config = JinjavaConfig.newBuilder().withLstripBlocks(true).withTrimBlocks(true).build();
+      Jinjava jinja = new Jinjava(config);
       return jinja.render(template, this);
     } finally {
       Thread.currentThread().setContextClassLoader(oldCl);
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
index d1df0f3..799a91c 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java
@@ -18,22 +18,24 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
 
 /**
  * Interpreter Launcher which use shell script to launch the interpreter process.
@@ -41,23 +43,14 @@ import java.util.Map;
 public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class);
-  private final Kubectl kubectl;
   private InterpreterLaunchContext context;
-
+  private final KubernetesClient client;
 
   public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) throws IOException {
     super(zConf, recoveryStorage);
-    kubectl = new Kubectl(zConf.getK8sKubectlCmd());
-    kubectl.setNamespace(getNamespace());
+    client = new DefaultKubernetesClient();
   }
 
-  @VisibleForTesting
-  K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage, Kubectl kubectl) {
-    super(zConf, recoveryStorage);
-    this.kubectl = kubectl;
-  }
-
-
   /**
    * Check if i'm running inside of kubernetes or not.
    * It should return truth regardless of ZeppelinConfiguration.getRunMode().
@@ -79,9 +72,9 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
    */
   String getNamespace() throws IOException {
     if (isRunningOnKubernetes()) {
-      return readFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace", Charset.defaultCharset()).trim();
+      return readFile(Config.KUBERNETES_NAMESPACE_PATH, Charset.defaultCharset()).trim();
     } else {
-      return "default";
+      return zConf.getK8sNamepsace();
     }
   }
 
@@ -143,10 +136,10 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
     LOGGER.info("Launching Interpreter: {}", context.getInterpreterSettingGroup());
     this.context = context;
     this.properties = context.getProperties();
-    int connectTimeout = getConnectTimeout();
 
     return new K8sRemoteInterpreterProcess(
-            kubectl,
+            client,
+            getNamespace(),
             new File(zConf.getK8sTemplatesDir(), "interpreter"),
             zConf.getK8sContainerImage(),
             context.getInterpreterGroupId(),
@@ -158,7 +151,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher {
             getZeppelinServiceRpcPort(),
             zConf.getK8sPortForward(),
             zConf.getK8sSparkContainerImage(),
-            connectTimeout,
+            getConnectTimeout(),
             isUserImpersonateForSparkInterpreter(context));
   }
 
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
deleted file mode 100644
index a0ceb0a..0000000
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/Kubectl.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.interpreter.launcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.commons.exec.*;
-import org.apache.commons.io.IOUtils;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Kubectl {
-  private static final Logger LOGGER = LoggerFactory.getLogger(Kubectl.class);
-  private final String kubectlCmd;
-  private String namespace;
-
-  public Kubectl(String kubectlCmd) {
-    this.kubectlCmd = kubectlCmd;
-  }
-
-  /**
-   * Override namespace. Otherwise use namespace provided in schema
-   * @param namespace
-   */
-  public void setNamespace(String namespace) {
-    this.namespace = namespace;
-  }
-
-  public String getNamespace() {
-    return namespace;
-  }
-
-  public String apply(String spec) throws IOException {
-    return execAndGet(new String[]{"apply", "-f", "-"}, spec);
-  }
-
-  public String delete(String spec) throws IOException {
-    return execAndGet(new String[]{"delete", "-f", "-"}, spec);
-  }
-
-  public String wait(String resource, String waitFor, int timeoutSec) throws IOException {
-    try {
-      return execAndGet(new String[]{
-          "wait",
-          resource,
-          String.format("--for=%s", waitFor),
-          String.format("--timeout=%ds", timeoutSec)});
-    } catch (IOException e) {
-      if ("delete".equals(waitFor) && e.getMessage().contains("NotFound")) {
-        LOGGER.info("{} Not found. Maybe already deleted.", resource);
-        return "";
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  public ExecuteWatchdog portForward(String resource, String [] ports) throws IOException {
-    DefaultExecutor executor = new DefaultExecutor();
-    CommandLine cmd = new CommandLine(kubectlCmd);
-    cmd.addArguments("port-forward");
-    cmd.addArguments(resource);
-    cmd.addArguments(ports);
-
-    ExecuteWatchdog watchdog = new ExecuteWatchdog(-1);
-    executor.setWatchdog(watchdog);
-
-    executor.execute(cmd, new ExecuteResultHandler() {
-      @Override
-      public void onProcessComplete(int i) {
-        LOGGER.info("Port-forward stopped");
-      }
-
-      @Override
-      public void onProcessFailed(ExecuteException e) {
-        LOGGER.debug("port-forward process exit", e);
-      }
-    });
-
-    return watchdog;
-  }
-
-  String execAndGet(String [] args) throws IOException {
-    return execAndGet(args, "");
-  }
-
-  @VisibleForTesting
-  String execAndGet(String [] args, String stdin) throws IOException {
-    InputStream ins = IOUtils.toInputStream(stdin, StandardCharsets.UTF_8);
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-    ArrayList<String> argsToOverride = new ArrayList<>(Arrays.asList(args));
-
-    // set namespace
-    if (namespace != null) {
-      argsToOverride.add("--namespace=" + namespace);
-    }
-
-    LOGGER.info("kubectl {}", argsToOverride);
-    LOGGER.debug(stdin);
-
-    try {
-      int exitCode = execute(
-              argsToOverride.toArray(new String[0]),
-              ins,
-              stdout,
-              stderr
-      );
-
-      if (exitCode == 0) {
-        return new String(stdout.toByteArray());
-      } else {
-        String output = new String(stderr.toByteArray());
-        throw new IOException(String.format("non zero return code (%d). %s", exitCode, output));
-      }
-    } catch (Exception e) {
-      String output = new String(stderr.toByteArray());
-      throw new IOException(output, e);
-    }
-  }
-
-  public int execute(String [] args, InputStream stdin, OutputStream stdout, OutputStream stderr) throws IOException {
-    DefaultExecutor executor = new DefaultExecutor();
-    CommandLine cmd = new CommandLine(kubectlCmd);
-    cmd.addArguments(args);
-
-    ExecuteWatchdog watchdog = new ExecuteWatchdog(60 * 1000L);
-    executor.setWatchdog(watchdog);
-
-    PumpStreamHandler streamHandler = new PumpStreamHandler(stdout, stderr, stdin);
-    executor.setStreamHandler(streamHandler);
-    return executor.execute(cmd);
-  }
-}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 26467a0..ad4d5fd 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -17,32 +17,34 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Properties;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.junit.Rule;
+import org.junit.Test;
+
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
 
 public class K8sRemoteInterpreterProcessTest {
 
+  @Rule
+  public KubernetesServer server = new KubernetesServer(true, true);
+
   @Test
   public void testGetHostPort() {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     HashMap<String, String> envs = new HashMap<String, String>();
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -58,21 +60,19 @@ public class K8sRemoteInterpreterProcessTest {
         false);
 
     // then
-    assertEquals(String.format("%s.%s.svc", intp.getPodName(), kubectl.getNamespace()), intp.getHost());
+    assertEquals(String.format("%s.%s.svc", intp.getPodName(), "default"), intp.getHost());
     assertEquals(12321, intp.getPort());
   }
 
   @Test
   public void testPredefinedPortNumbers() {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     HashMap<String, String> envs = new HashMap<String, String>();
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -98,16 +98,14 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testGetTemplateBindings() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     HashMap<String, String> envs = new HashMap<String, String>();
     envs.put("MY_ENV1", "V1");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -123,7 +121,7 @@ public class K8sRemoteInterpreterProcessTest {
         false);
 
     // when
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings(null);
 
     // then
     assertEquals("default", p.get("zeppelin.k8s.namespace"));
@@ -148,9 +146,6 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testGetTemplateBindingsForSpark() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
@@ -160,7 +155,8 @@ public class K8sRemoteInterpreterProcessTest {
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -176,8 +172,7 @@ public class K8sRemoteInterpreterProcessTest {
         false);
 
     // when
-    intp.start("mytestUser");
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings("mytestUser");
 
     // then
     assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
@@ -188,7 +183,7 @@ public class K8sRemoteInterpreterProcessTest {
 
     String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
     assertTrue(sparkSubmitOptions.startsWith("my options "));
-    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
@@ -201,9 +196,6 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testGetTemplateBindingsForSparkWithProxyUser() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
@@ -213,7 +205,8 @@ public class K8sRemoteInterpreterProcessTest {
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -229,8 +222,7 @@ public class K8sRemoteInterpreterProcessTest {
         true);
 
     // when
-    intp.start("mytestUser");
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings("mytestUser");
     // then
     assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
     assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
@@ -240,7 +232,7 @@ public class K8sRemoteInterpreterProcessTest {
 
     String sparkSubmitOptions = envs.get("SPARK_SUBMIT_OPTIONS");
     assertTrue(sparkSubmitOptions.startsWith("my options "));
-    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=" + kubectl.getNamespace()));
+    assertTrue(sparkSubmitOptions.contains("spark.kubernetes.namespace=default"));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.driver.pod.name=" + intp.getPodName()));
     assertTrue(sparkSubmitOptions.contains("spark.kubernetes.container.image=spark-container:1.0"));
     assertTrue(sparkSubmitOptions.contains("spark.driver.host=" + intp.getHost()));
@@ -253,9 +245,6 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testGetTemplateBindingsForSparkWithProxyUserAnonymous() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("my.key1", "v1");
     properties.put("spark.master", "k8s://http://api");
@@ -265,7 +254,8 @@ public class K8sRemoteInterpreterProcessTest {
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -281,8 +271,7 @@ public class K8sRemoteInterpreterProcessTest {
         true);
 
     // when
-    intp.start("anonymous");
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings("anonymous");
     // then
     assertEquals("spark-container:1.0", p.get("zeppelin.k8s.spark.container.image"));
     assertEquals(String.format("//4040-%s.%s", intp.getPodName(), "mydomain"), p.get("zeppelin.spark.uiWebUrl"));
@@ -298,15 +287,13 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testSparkUiWebUrlTemplate() {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     HashMap<String, String> envs = new HashMap<String, String>();
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -341,9 +328,6 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testSparkPodResources() {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("spark.driver.memory", "1g");
     properties.put("spark.driver.cores", "1");
@@ -351,7 +335,8 @@ public class K8sRemoteInterpreterProcessTest {
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -367,7 +352,7 @@ public class K8sRemoteInterpreterProcessTest {
         false);
 
     // when
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings(null);
 
     // then
     assertEquals("1", p.get("zeppelin.k8s.interpreter.cores"));
@@ -377,9 +362,6 @@ public class K8sRemoteInterpreterProcessTest {
   @Test
   public void testSparkPodResourcesMemoryOverhead() {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     Properties properties = new Properties();
     properties.put("spark.driver.memory", "1g");
     properties.put("spark.driver.memoryOverhead", "256m");
@@ -388,7 +370,8 @@ public class K8sRemoteInterpreterProcessTest {
     envs.put("SERVICE_DOMAIN", "mydomain");
 
     K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
-        kubectl,
+        server.getClient(),
+        "default",
         new File(".skip"),
         "interpreter-container:1.0",
         "shared_process",
@@ -404,7 +387,7 @@ public class K8sRemoteInterpreterProcessTest {
         false);
 
     // when
-    Properties p = intp.getTemplateBindings();
+    Properties p = intp.getTemplateBindings(null);
 
     // then
     assertEquals("5", p.get("zeppelin.k8s.interpreter.cores"));
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
index daf3773..f859cab 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sSpecTemplateTest.java
@@ -51,6 +51,21 @@ public class K8sSpecTemplateTest {
   }
 
   @Test
+  public void testRenderWithStrip() {
+    // given
+    K8sSpecTemplate template = new K8sSpecTemplate();
+    template.put("test", "test");
+      // when
+    String spec = template.render(
+          "  {% if test == \"test\" %}\n" +
+          "  After commit\n" +
+          "  {% endif %}\n");
+
+    // then
+    assertEquals("  After commit\n", spec);
+  }
+
+  @Test
   public void testIterate() {
     // given
     K8sSpecTemplate template = new K8sSpecTemplate();
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
index c580a43..ff47dd0 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java
@@ -17,18 +17,16 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
-import org.apache.zeppelin.conf.ZeppelinConfiguration;
-import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Properties;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * In the future, test may use minikube on travis for end-to-end test
@@ -46,11 +44,8 @@ public class K8sStandardInterpreterLauncherTest {
   @Test
   public void testK8sLauncher() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
     properties.setProperty("ENV_1", "VALUE_1");
     properties.setProperty("property_1", "value_1");
@@ -78,11 +73,8 @@ public class K8sStandardInterpreterLauncherTest {
   @Test
   public void testK8sLauncherWithSparkAndUserImpersonate() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
     properties.setProperty("ENV_1", "VALUE_1");
     properties.setProperty("property_1", "value_1");
@@ -108,22 +100,14 @@ public class K8sStandardInterpreterLauncherTest {
     assertTrue(client instanceof K8sRemoteInterpreterProcess);
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
-
-    // when
-    process.start(context.getUserName());
-
-    // then
-    assertTrue(process.buildSparkSubmitOptions().contains("--proxy-user user1"));
+    assertTrue(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
   }
 
   @Test
   public void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOException {
     // given
-    Kubectl kubectl = mock(Kubectl.class);
-    when(kubectl.getNamespace()).thenReturn("default");
-
     ZeppelinConfiguration zConf = new ZeppelinConfiguration();
-    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null, kubectl);
+    K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null);
     Properties properties = new Properties();
     properties.setProperty("ENV_1", "VALUE_1");
     properties.setProperty("property_1", "value_1");
@@ -149,11 +133,6 @@ public class K8sStandardInterpreterLauncherTest {
     assertTrue(client instanceof K8sRemoteInterpreterProcess);
     K8sRemoteInterpreterProcess process = (K8sRemoteInterpreterProcess) client;
     assertTrue(process.isSpark());
-
-    // when
-    process.start(context.getUserName());
-
-    // then
-    assertFalse(process.buildSparkSubmitOptions().contains("--proxy-user user1"));
+    assertFalse(process.buildSparkSubmitOptions(context.getUserName()).contains("--proxy-user user1"));
   }
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java
deleted file mode 100644
index 072cf94..0000000
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/KubectlTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zeppelin.interpreter.launcher;
-
-import org.apache.commons.io.IOUtils;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class KubectlTest {
-
-  @Test(expected = IOException.class)
-  public void testKubeclCommandNotExists() throws IOException {
-    // given
-    Kubectl kubectl = new Kubectl("invalidcommand");
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
-    // when
-    kubectl.execute(new String[] {}, null, stdout, stderr);
-
-    // then throw IOException
-  }
-
-  @Test
-  public void testStdout() throws IOException {
-    // given
-    Kubectl kubectl = new Kubectl("echo");
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
-    // when
-    kubectl.execute(new String[] {"hello"}, null, stdout, stderr);
-
-    // then
-    assertEquals("hello\n", stdout.toString());
-    assertEquals("", stderr.toString());
-  }
-
-  @Test
-  public void testStderr() throws IOException {
-    // given
-    Kubectl kubectl = new Kubectl("sh");
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-
-    // when
-    try {
-      kubectl.execute(new String[]{"-c", "yoyo"}, null, stdout, stderr);
-    } catch (IOException e) {
-    }
-
-    // then
-    assertEquals("", stdout.toString());
-    assertTrue(0 < stderr.toString().length());
-  }
-
-  @Test
-  public void testStdin() throws IOException {
-    // given
-    Kubectl kubectl = new Kubectl("wc");
-    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
-    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
-    InputStream stdin = IOUtils.toInputStream("Hello");
-
-    // when
-    kubectl.execute(new String[]{"-c"}, stdin, stdout, stderr);
-
-    // then
-    assertEquals("5", stdout.toString().trim());
-    assertEquals("", stderr.toString());
-  }
-
-  @Test
-  public void testExecSpecAndGet() throws IOException {
-    // given
-    Kubectl kubectl = new Kubectl("cat");
-    String spec = "{'k1': 'v1', 'k2': 2}";
-
-    // when
-    String result = kubectl.execAndGet(new String[]{}, spec);
-
-    // then
-    assertEquals(spec, result);
-  }
-}