You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/10/11 12:30:08 UTC

[zeppelin] branch master updated: [ZEPPELIN-5352] Support flink in k8s mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 28bc972  [ZEPPELIN-5352] Support flink in k8s mode
28bc972 is described below

commit 28bc9722700905777f2304a843a126c302d2023e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Oct 8 20:57:29 2021 +0800

    [ZEPPELIN-5352] Support flink in k8s mode
    
    ### What is this PR for?
    
    Flink support k8s native mode from flink 1.11, this PR is to support k8s-application mode. The implementation is very similar as yarn-application mode.
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5352
    
    ### How should this be tested?
    * Ci pass and manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4116 from zjffdu/ZEPPELIN-5352 and squashes the following commits:
    
    983a3793da [Jeff Zhang] address comment
    11d0c60af1 [Jeff Zhang] [ZEPPELIN-5352]. Support flink in k8s mode
---
 bin/interpreter.sh                                 |  6 +-
 ...va => ApplicationModeExecutionEnvironment.java} |  6 +-
 ....java => ApplicationModeStreamEnvironment.java} |  8 +--
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 32 ++++++++-
 .../zeppelin/flink/internal/FlinkILoop.scala       |  9 ++-
 .../zeppelin/flink/internal/FlinkShell.scala       | 20 +++++-
 .../zeppelin/integration/FlinkIntegrationTest.java |  2 +-
 .../remote/RemoteInterpreterServer.java            | 19 +++--
 .../launcher/FlinkInterpreterLauncher.java         | 83 +++++++++++++++++-----
 .../zeppelin/interpreter/InterpreterSetting.java   |  4 ++
 .../remote/ExecRemoteInterpreterProcess.java       |  9 ++-
 11 files changed, 148 insertions(+), 50 deletions(-)

diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index c75a299..6d9c048 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -304,9 +304,9 @@ if [[ -n "${SPARK_SUBMIT}" ]]; then
   else
     INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
   fi
-elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then
-  IFS='|' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
-  INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
+elif [[ -n "${ZEPPELIN_FLINK_APPLICATION_MODE}" ]]; then
+  IFS='|' read -r -a ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY <<< "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF}"
+  INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "${ZEPPELIN_FLINK_APPLICATION_MODE}" "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
 else
   IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
   IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}"
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
similarity index 94%
rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
index 2cfd0e3..33afa03 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
@@ -37,15 +37,15 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 
 /**
- * ExecutionEnvironment used for yarn application mode.
+ * ExecutionEnvironment used for application mode.
  * Need to add jars of scala shell before submitting jobs.
  */
-public class YarnApplicationExecutionEnvironment extends ExecutionEnvironment {
+public class ApplicationModeExecutionEnvironment extends ExecutionEnvironment {
 
   private FlinkILoop flinkILoop;
   private FlinkScalaInterpreter flinkScalaInterpreter;
 
-  public YarnApplicationExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+  public ApplicationModeExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
                                              Configuration configuration,
                                              ClassLoader userClassloader,
                                              FlinkILoop flinkILoop,
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
similarity index 93%
rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
index 7f2fc92..b86e556 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
@@ -40,17 +40,17 @@ import static org.apache.flink.util.Preconditions.checkState;
 
 
 /**
- * StreamExecutionEnvironment used for yarn application mode.
+ * StreamExecutionEnvironment used for application mode.
  * Need to add jars of scala shell before submitting jobs.
  */
-public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment {
+public class ApplicationModeStreamEnvironment extends StreamExecutionEnvironment {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(YarnApplicationStreamEnvironment.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationModeStreamEnvironment.class);
 
   private FlinkILoop flinkILoop;
   private FlinkScalaInterpreter flinkScalaInterpreter;
 
-  public YarnApplicationStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+  public ApplicationModeStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
                                           Configuration configuration,
                                           ClassLoader userClassloader,
                                           FlinkILoop flinkILoop,
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 0918084..12051dd 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink
 
-import java.io.File
+import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.nio.file.Files
 import java.util.Properties
@@ -45,6 +45,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
 import org.apache.flink.table.module.hive.HiveModule
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli
 import org.apache.zeppelin.dep.DependencyResolver
+import org.apache.zeppelin.flink.internal.FlinkShell
 import org.apache.zeppelin.flink.internal.FlinkShell._
 import org.apache.zeppelin.flink.internal.FlinkILoop
 import org.apache.zeppelin.interpreter.Interpreter.FormType
@@ -145,7 +146,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
     // load udf jar
     this.userUdfJars.foreach(jar => loadUDFJar(jar))
 
-    if (mode == ExecutionMode.YARN_APPLICATION) {
+    if (ExecutionMode.isApplicationMode(mode)) {
       // have to call senv.execute method before running any user code, otherwise yarn application mode
       // will cause ClassNotFound issue. Needs to do more investigation. TODO(zjffdu)
       val initCode =
@@ -185,7 +186,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
       properties.getProperty("flink.execution.mode", "LOCAL")
         .replace("-", "_")
         .toUpperCase)
-    if (mode == ExecutionMode.YARN_APPLICATION) {
+    if (ExecutionMode.isYarnAppicationMode(mode)) {
       if (flinkVersion.isFlink110) {
         throw new Exception("yarn-application mode is only supported after Flink 1.11")
       }
@@ -195,6 +196,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
       flinkConfDir = workingDirectory
       hiveConfDir = workingDirectory
     }
+    if (ExecutionMode.isK8sApplicationMode(mode)) {
+      if (flinkVersion.isFlink110) {
+        throw new Exception("application mode is only supported after Flink 1.11")
+      }
+      // use current pod working directory as FLINK_HOME
+      val workingDirectory = new File(".").getAbsolutePath
+      flinkHome = workingDirectory
+      flinkConfDir = workingDirectory + "/conf"
+      hiveConfDir = workingDirectory + "/conf"
+    }
+
     LOGGER.info("FLINK_HOME: " + flinkHome)
     LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir)
     LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir)
@@ -234,7 +246,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
         .copy(queue = Some(queue))))
 
     this.userUdfJars = getUserUdfJars()
+
     this.userJars = getUserJarsExceptUdfJars ++ this.userUdfJars
+    if (ExecutionMode.isK8sApplicationMode(mode)) {
+      var flinkAppJar = properties.getProperty("flink.app.jar")
+      if (flinkAppJar != null && flinkAppJar.startsWith("local://")) {
+        flinkAppJar = flinkAppJar.substring(8)
+        this.userJars = this.userJars :+ flinkAppJar
+      } else {
+        throw new IOException("flink.app.jar is not set or invalid, flink.app.jar: " + flinkAppJar)
+      }
+    }
     LOGGER.info("UserJars: " + userJars.mkString(","))
     config = config.copy(externalJars = Some(userJars.toArray))
     LOGGER.info("Config: " + config)
@@ -317,6 +339,10 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
             LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
             this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
             this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
+          } else if (ExecutionMode.isK8sApplicationMode(mode)) {
+            LOGGER.info("Use FlinkCluster in kubernetes-application mode")
+            this.jmWebUrl = "http://localhost:" + configuration.getInteger("rest.port", 8081)
+            this.displayedJMWebUrl = this.jmWebUrl
           } else {
             LOGGER.info("Use FlinkCluster in remote mode")
             this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index 86abeb5..1be64ab 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -23,13 +23,12 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.util.AbstractID
 import java.io.{BufferedReader, File, FileOutputStream, IOException}
-import java.net.URL
 
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader
-import org.apache.zeppelin.flink.{FlinkScalaInterpreter, YarnApplicationExecutionEnvironment, YarnApplicationStreamEnvironment}
+import org.apache.zeppelin.flink.{ApplicationModeExecutionEnvironment, ApplicationModeStreamEnvironment, FlinkScalaInterpreter}
 import FlinkShell.ExecutionMode
 
 import scala.tools.nsc.interpreter._
@@ -71,17 +70,17 @@ class FlinkILoop(
     scalaBenv: ExecutionEnvironment,
     scalaSenv: StreamExecutionEnvironment
     ) = {
-    if (mode == ExecutionMode.YARN_APPLICATION) {
+    if (ExecutionMode.isApplicationMode(mode)) {
       // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
       // by flink itself, we here just try get them via reflection and reconstruct them.
-      val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment(
+      val scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment(
         getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
         getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
         getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
         this,
         flinkScalaInterpreter
       ))
-      val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment(
+      val scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment(
         getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
         getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
         getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
index 437d188..ab0f299 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
@@ -18,7 +18,7 @@
 
 package org.apache.zeppelin.flink.internal
 
-import java.io.BufferedReader
+import java.io._
 
 import org.apache.flink.annotation.Internal
 import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati
 import org.apache.flink.yarn.executors.YarnSessionClusterExecutor
 import org.apache.zeppelin.flink.FlinkShims
 
+
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -39,7 +40,19 @@ import scala.collection.mutable.ArrayBuffer
 object FlinkShell {
 
   object ExecutionMode extends Enumeration {
-    val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION = Value
+    val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION, KUBERNETES_APPLICATION = Value
+
+    def isYarnAppicationMode(mode: ExecutionMode.Value): Boolean = {
+      mode == ExecutionMode.YARN_APPLICATION
+    }
+
+    def isK8sApplicationMode(mode: ExecutionMode.Value): Boolean = {
+      mode == ExecutionMode.KUBERNETES_APPLICATION
+    }
+
+    def isApplicationMode(mode: ExecutionMode.Value): Boolean = {
+      isYarnAppicationMode(mode) || isK8sApplicationMode(mode)
+    }
   }
 
   /** Configuration object */
@@ -84,9 +97,10 @@ object FlinkShell {
       case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
       case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
       case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
+      case ExecutionMode.KUBERNETES_APPLICATION => (flinkConfig, None)
       case ExecutionMode.UNDEFINED => // Wrong input
         throw new IllegalArgumentException("please specify execution mode:\n" +
-          "[local | remote <host> <port> | yarn | yarn-application ]")
+          "[local | remote <host> <port> | yarn | yarn-application | kubernetes-application]")
     }
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index f27acbd..2dc14f8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -153,7 +153,7 @@ public abstract class FlinkIntegrationTest {
     flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
     flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
     flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
-    flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN");
+    flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn");
     flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
 
     testInterpreterBasics();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index c058dd6..aff8139 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -77,7 +77,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.nio.ByteBuffer;
@@ -142,6 +141,12 @@ public class RemoteInterpreterServer extends Thread
   private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor();
 
   private boolean isTest;
+  // Whether calling System.exit to force shutdown interpreter process.
+  // In Flink K8s application mode, RemoteInterpreterServer#main is called via reflection by flink framework.
+  // We should not call System.exit in this scenario when RemoteInterpreterServer is stopped,
+  // Otherwise flink will think flink job is exited abnormally and will try to restart this
+  // pod (RemoteInterpreterServer)
+  private boolean isForceShutdown = true;
 
   private ZeppelinConfiguration zConf;
   // cluster manager client
@@ -321,7 +326,10 @@ public class RemoteInterpreterServer extends Thread
      * should be part of the next release and solve the problem.
      * We may have other threads that are not terminated successfully.
      */
-    System.exit(0);
+    if (remoteInterpreterServer.isForceShutdown) {
+      LOGGER.info("Force shutting down");
+      System.exit(0);
+    }
   }
 
   // Submit interpreter process metadata information to cluster metadata
@@ -380,11 +388,14 @@ public class RemoteInterpreterServer extends Thread
               replClass.getConstructor(new Class[]{Properties.class});
       Interpreter interpreter = constructor.newInstance(p);
       interpreter.setClassloaderUrls(new URL[]{});
-      LOGGER.info("Instantiate interpreter {}", className);
+
       interpreter.setInterpreterGroup(interpreterGroup);
       interpreter.setUserName(userName);
 
       interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);
+
+      this.isForceShutdown = Boolean.parseBoolean(properties.getOrDefault("zeppelin.interpreter.forceShutdown", "true"));
+      LOGGER.info("Instantiate interpreter {}, isForceShutdown: {}", className, isForceShutdown);
     } catch (Exception e) {
       LOGGER.error(e.getMessage(), e);
       throw new InterpreterRPCException("Fail to create interpreter, cause: " + e.toString());
@@ -700,7 +711,7 @@ public class RemoteInterpreterServer extends Thread
         }
       }
 
-      if (server.isServing()) {
+      if (server.isServing() && isForceShutdown) {
         LOGGER.info("Force shutting down");
         System.exit(1);
       }
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 3ff4f56..83a1f24 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -18,6 +18,7 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import com.google.common.base.CharMatcher;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -26,18 +27,20 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringJoiner;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreterLauncher.class);
-
+  private static final Set<String> FLINK_EXECUTION_MODES = Sets.newHashSet(
+          "local", "remote", "yarn", "yarn-application", "kubernetes-application");
 
   public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
     super(zConf, recoveryStorage);
@@ -48,8 +51,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
           throws IOException {
     Map<String, String> envs = super.buildEnvFromProperties(context);
 
-    String flinkHome = updateEnvsForFlinkHome(envs, context);
-
+    // update FLINK related environment variables
+    String flinkHome = getFlinkHome(context);
     if (!envs.containsKey("FLINK_CONF_DIR")) {
       envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
     }
@@ -59,14 +62,29 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     normalizeConfiguration(context);
 
     String flinkExecutionMode = context.getProperties().getProperty("flink.execution.mode");
-    // yarn application mode specific logic
-    if ("yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
-      updateEnvsForYarnApplicationMode(envs, context);
+    if (!FLINK_EXECUTION_MODES.contains(flinkExecutionMode)) {
+      throw new IOException("Not valid flink.execution.mode: " +
+              flinkExecutionMode + ", valid modes ares: " +
+              FLINK_EXECUTION_MODES.stream().collect(Collectors.joining(", ")));
+    }
+    // application mode specific logic
+    if (isApplicationMode(flinkExecutionMode)) {
+      updateEnvsForApplicationMode(flinkExecutionMode, envs, context);
     }
 
-    String flinkAppJar = chooseFlinkAppJar(flinkHome);
-    LOGGER.info("Choose FLINK_APP_JAR: {}", flinkAppJar);
-    envs.put("FLINK_APP_JAR", flinkAppJar);
+    if (isK8sApplicationMode(flinkExecutionMode)) {
+      String flinkAppJar = context.getProperties().getProperty("flink.app.jar");
+      if (StringUtils.isBlank(flinkAppJar)) {
+        throw new IOException("flink.app.jar is not specified for kubernetes-application mode");
+      }
+      envs.put("FLINK_APP_JAR", flinkAppJar);
+      LOGGER.info("K8s application's FLINK_APP_JAR : " + flinkAppJar);
+      context.getProperties().put("zeppelin.interpreter.forceShutdown", "false");
+    } else {
+      String flinkAppJar = chooseFlinkAppJar(flinkHome);
+      LOGGER.info("Choose FLINK_APP_JAR for non k8s-application mode: {}", flinkAppJar);
+      envs.put("FLINK_APP_JAR", flinkAppJar);
+    }
 
     if ("yarn".equalsIgnoreCase(flinkExecutionMode) ||
             "yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
@@ -148,8 +166,28 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     return flinkScalaJars.get(0).getAbsolutePath();
   }
 
-  private String updateEnvsForFlinkHome(Map<String, String> envs,
-                                      InterpreterLaunchContext context) throws IOException {
+  private boolean isApplicationMode(String mode) {
+    return isYarnApplicationMode(mode) || isK8sApplicationMode(mode);
+  }
+
+  private boolean isYarnApplicationMode(String mode) {
+    return "yarn-application".equals(mode);
+  }
+
+  private boolean isK8sApplicationMode(String mode) {
+    return "kubernetes-application".equals(mode);
+  }
+
+  /**
+   * Get FLINK_HOME in the following orders:
+   *    1. FLINK_HOME in interpreter setting
+   *    2. FLINK_HOME in system environment variables.
+   *
+   * @param context
+   * @return
+   * @throws IOException
+   */
+  private String getFlinkHome(InterpreterLaunchContext context) throws IOException {
     String flinkHome = context.getProperties().getProperty("FLINK_HOME");
     if (StringUtils.isBlank(flinkHome)) {
       flinkHome = System.getenv("FLINK_HOME");
@@ -168,11 +206,11 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
     return flinkHome;
   }
 
-  private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
-                                                InterpreterLaunchContext context)
-          throws IOException {
-
-    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+  private void updateEnvsForApplicationMode(String mode,
+                                            Map<String, String> envs,
+                                            InterpreterLaunchContext context) throws IOException {
+    // ZEPPELIN_FLINK_APPLICATION_MODE is used in interpreter.sh
+    envs.put("ZEPPELIN_FLINK_APPLICATION_MODE", mode);
 
     StringJoiner flinkConfStringJoiner = new StringJoiner("|");
     // set yarn.ship-files
@@ -190,7 +228,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
       flinkConfStringJoiner.add("yarn.application.name=" + yarnAppName);
     }
 
-    // add other yarn and python configuration.
+    // add other configuration for both k8s and yarn
     for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) {
       String key = entry.getKey().toString();
       String value = entry.getValue().toString();
@@ -205,9 +243,16 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
         }
       }
     }
-    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkConfStringJoiner.toString());
+    envs.put("ZEPPELIN_FLINK_APPLICATION_MODE_CONF", flinkConfStringJoiner.toString());
   }
 
+  /**
+   * Used in yarn-application mode.
+   *
+   * @param context
+   * @return
+   * @throws IOException
+   */
   private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException {
     // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled
     // and HIVE_CONF_DIR is specified
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 3b0728e..bff9273 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -762,6 +762,10 @@ public class InterpreterSetting {
 
   public String getLauncherPlugin(Properties properties) {
     if (isRunningOnKubernetes()) {
+      if (group.equals("flink")) {
+        // Flink has its own implementation of k8s mode.
+        return "FlinkInterpreterLauncher";
+      }
       return "K8sStandardInterpreterLauncher";
     } else if (isRunningOnCluster()) {
       return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index f7b85a2..3b2142d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -229,9 +229,9 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
         synchronized (this) {
           notifyAll();
         }
-      } else if (isFlinkYarnApplicationMode() && exitValue == 0) {
+      } else if (isFlinkApplicationMode() && exitValue == 0) {
         // Don't update transition state when flink launcher process exist
-        // in yarn application mode.
+        // in flink application mode.
         synchronized (this) {
           notifyAll();
         }
@@ -252,9 +252,8 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
               getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false"));
     }
 
-    private boolean isFlinkYarnApplicationMode() {
-      return Boolean.parseBoolean(
-              getEnv().getOrDefault("ZEPPELIN_FLINK_YARN_APPLICATION", "false"));
+    private boolean isFlinkApplicationMode() {
+      return getEnv().containsKey("ZEPPELIN_FLINK_APPLICATION_MODE");
     }
 
     @Override