You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/05/03 07:05:13 UTC

[zeppelin] branch master updated: [ZEPPELIN-5320]. Support yarn application mode for flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cc9fc1  [ZEPPELIN-5320]. Support yarn application mode for flink interpreter
4cc9fc1 is described below

commit 4cc9fc165092d641f0f8ba8be157878e47d72df0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Apr 27 15:19:28 2021 +0800

    [ZEPPELIN-5320]. Support yarn application mode for flink interpreter
    
    ### What is this PR for?
    
    This PR is to support yarn application mode of flink interpreter. In yarn application mode, flink interpreter is running in JM. Integration test is added.
    
    ### What type of PR is it?
    [Feature | Documentation ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5320
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4098 from zjffdu/ZEPPELIN-5320 and squashes the following commits:
    
    c32905a51 [Jeff Zhang] update
    c2c3671ad [Jeff Zhang] update flink
    96a3ab46c [Jeff Zhang] update k8s
    844908552 [Jeff Zhang] update
    37789fcf4 [Jeff Zhang] update
    bc0c5816d [Jeff Zhang] update
    7be899aea [Jeff Zhang] fix comment
    090e033ed [Jeff Zhang] address comment
    78ad43d57 [Jeff Zhang] [ZEPPELIN-5320]. Support yarn application mode for flink interpreter
---
 bin/interpreter.sh                                 |  4 +
 conf/log4j2.properties                             |  4 -
 docs/interpreter/flink.md                          | 17 +++-
 .../org/apache/zeppelin/flink/Flink112Shims.java   | 36 +++++---
 .../org/apache/zeppelin/flink/HadoopUtils.java     | 27 +++++-
 .../java/org/apache/zeppelin/flink/JobManager.java | 15 ++--
 .../flink/YarnApplicationExecutionEnvironment.java | 89 ++++++++++++++++++++
 .../flink/YarnApplicationStreamEnvironment.java    | 93 +++++++++++++++++++++
 .../org/apache/zeppelin/flink/FlinkILoop.scala     | 88 ++++++++++++++++++--
 .../zeppelin/flink/FlinkScalaInterpreter.scala     | 97 +++++++++++++++++-----
 .../org/apache/zeppelin/flink/FlinkShell.scala     |  8 +-
 .../zeppelin/integration/FlinkIntegrationTest.java | 22 +++++
 .../launcher/FlinkInterpreterLauncher.java         | 93 +++++++++++++++++++--
 .../remote/ExecRemoteInterpreterProcess.java       | 19 ++++-
 14 files changed, 545 insertions(+), 67 deletions(-)

diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 663eca4..169d51a 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -248,6 +248,7 @@ elif [[ "${INTERPRETER_ID}" == "flink" ]]; then
 
   FLINK_PYTHON_JAR=$(find "${FLINK_HOME}/opt" -name 'flink-python_*.jar')
   ZEPPELIN_INTP_CLASSPATH+=":${FLINK_PYTHON_JAR}"
+  FLINK_APP_JAR="$(ls "${ZEPPELIN_HOME}"/interpreter/flink/zeppelin-flink-*.jar)"
 
   if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
     ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
@@ -299,6 +300,9 @@ if [[ -n "${SPARK_SUBMIT}" ]]; then
   else
     INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
   fi
+elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then
+  IFS=' ' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
+  INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
 else
   IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
   IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}"
diff --git a/conf/log4j2.properties b/conf/log4j2.properties
index 8e6f949..353ca40 100644
--- a/conf/log4j2.properties
+++ b/conf/log4j2.properties
@@ -36,10 +36,6 @@ logger.hadoop.level = INFO
 logger.zookeeper.name = org.apache.zookeeper
 logger.zookeeper.level = INFO
 
-logger.flink.name = org.apache.zeppelin.flink
-logger.flink.level = DEBUG
-
-
 # Log all infos in the given file
 appender.main.name = MainAppender
 appender.main.type = File
diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md
index dcfc625..40d8d6f 100644
--- a/docs/interpreter/flink.md
+++ b/docs/interpreter/flink.md
@@ -259,13 +259,14 @@ There are 2 planners supported by Flink's table api: `flink` & `blink`.
 Check this [page](https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/common.html#main-differences-between-the-two-planners) for the difference between flink planner and blink planner.
 
 
-## Execution mode (Local/Remote/Yarn)
+## Execution mode (Local/Remote/Yarn/Yarn Application)
 
-Flink in Zeppelin supports 3 execution modes (`flink.execution.mode`):
+Flink in Zeppelin supports 4 execution modes (`flink.execution.mode`):
 
 * Local
 * Remote
 * Yarn
+* Yarn Application
 
 ### Run Flink in Local Mode
 
@@ -283,7 +284,17 @@ Running Flink in remote mode will connect to an existing flink cluster which cou
 In order to run flink in Yarn mode, you need to make the following settings:
 
 * Set `flink.execution.mode` to `yarn`
-* Set `HADOOP_CONF_DIR` in flink's interpreter setting.
+* Set `HADOOP_CONF_DIR` in flink's interpreter setting or `zeppelin-env.sh`.
+* Make sure `hadoop` command is on your PATH. Because internally flink will call command `hadoop classpath` and load all the hadoop related jars in the flink interpreter process
+
+### Run Flink in Yarn Application Mode
+
+In the above yarn mode, there will be a separated flink interpreter process. This may run out of resources when there're many interpreter processes.
+So it is recommended to use yarn application mode if you are using flink 1.11 or afterwards (yarn application mode is only supported after flink 1.11). In this mode flink interpreter runs in the JobManager which is in yarn container.
+In order to run flink in yarn application mode, you need to make the following settings:
+
+* Set `flink.execution.mode` to `yarn-application`
+* Set `HADOOP_CONF_DIR` in flink's interpreter setting or `zeppelin-env.sh`.
 * Make sure `hadoop` command is on your PATH. Because internally flink will call command `hadoop classpath` and load all the hadoop related jars in the flink interpreter process
 
 
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index f5dcef9..4f6b698 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -20,6 +20,7 @@ package org.apache.zeppelin.flink;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -173,21 +174,32 @@ public class Flink112Shims extends FlinkShims {
 
   @Override
   public String getPyFlinkPythonPath(Properties properties) throws IOException {
+    String mode = properties.getProperty("flink.execution.mode");
+    if ("yarn-application".equalsIgnoreCase(mode)) {
+      // for yarn application mode, FLINK_HOME is container working directory
+      String flinkHome = new File(".").getAbsolutePath();
+      return getPyFlinkPythonPath(flinkHome + "/lib/python");
+    }
+
     String flinkHome = System.getenv("FLINK_HOME");
-    if (flinkHome != null) {
-      List<File> depFiles = null;
-      depFiles = Arrays.asList(new File(flinkHome + "/opt/python").listFiles());
-      StringBuilder builder = new StringBuilder();
-      for (File file : depFiles) {
-        LOGGER.info("Adding extracted file to PYTHONPATH: " + file.getAbsolutePath());
-        builder.append(file.getAbsolutePath() + ":");
-      }
-      return builder.toString();
+    if (StringUtils.isNotBlank(flinkHome)) {
+      return getPyFlinkPythonPath(flinkHome + "/opt/python");
     } else {
       throw new IOException("No FLINK_HOME is specified");
     }
   }
 
+  private String getPyFlinkPythonPath(String pyFlinkFolder) {
+    LOGGER.info("Getting pyflink lib from {}", pyFlinkFolder);
+    List<File> depFiles = Arrays.asList(new File(pyFlinkFolder).listFiles());
+    StringBuilder builder = new StringBuilder();
+    for (File file : depFiles) {
+      LOGGER.info("Adding extracted file {} to PYTHONPATH", file.getAbsolutePath());
+      builder.append(file.getAbsolutePath() + ":");
+    }
+    return builder.toString();
+  }
+
   @Override
   public Object getCollectStreamTableSink(InetAddress targetAddress, int targetPort, Object serializer) {
     return new CollectStreamTableSink(targetAddress, targetPort, (TypeSerializer<Tuple2<Boolean, Row>>) serializer);
@@ -445,7 +457,11 @@ public class Flink112Shims extends FlinkShims {
     Map<String, ConfigOption> configOptions = new HashMap<>();
     configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
     configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    try {
+      configOptions.putAll(extractConfigOptions(PythonOptions.class));
+    } catch (NoClassDefFoundError e) {
+      LOGGER.warn("No pyflink jars found");
+    }
     configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
     return configOptions;
   }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
index 85d33f0..d8b7e59 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/HadoopUtils.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +37,8 @@ import java.io.IOException;
 
 /**
  * Move the hadoop related operation (depends on hadoop api) out of FlinkScalaInterpreter to this
- * class is because in this way we don't need to load hadoop class for non-yarn mode. Otherwise
- * even in non-yarn mode, user still need hadoop shaded jar which doesnt' make sense.
+ * class. The reason is in this way we don't need to load hadoop class for non-yarn mode. Otherwise
+ * even in non-yarn mode, user still need hadoop shaded jar which doesn't make sense.
  */
 public class HadoopUtils {
 
@@ -44,6 +46,24 @@ public class HadoopUtils {
 
   public static String getYarnAppTrackingUrl(ClusterClient clusterClient) throws IOException, YarnException {
     ApplicationId yarnAppId = (ApplicationId) clusterClient.getClusterId();
+    return getYarnAppTrackingUrl(yarnAppId);
+  }
+
+  public static String getYarnAppTrackingUrl(String yarnAppIdStr) throws IOException, YarnException {
+    ApplicationId yarnAppId = ConverterUtils.toApplicationId(yarnAppIdStr);
+    return getYarnAppTrackingUrl(yarnAppId);
+  }
+
+  public static String getYarnAppTrackingUrl(ApplicationId yarnAppId) throws IOException, YarnException {
+    return getYarnApplicationReport(yarnAppId).getTrackingUrl();
+  }
+
+  public static int getFlinkRestPort(String yarnAppId) throws IOException, YarnException {
+    return getYarnApplicationReport(ConverterUtils.toApplicationId(yarnAppId)).getRpcPort();
+  }
+
+  private static ApplicationReport getYarnApplicationReport(ApplicationId yarnAppId)
+          throws IOException, YarnException {
     YarnClient yarnClient = YarnClient.createYarnClient();
     YarnConfiguration yarnConf = new YarnConfiguration();
     // disable timeline service as we only query yarn app here.
@@ -52,7 +72,7 @@ public class HadoopUtils {
     yarnConf.set("yarn.timeline-service.enabled", "false");
     yarnClient.init(yarnConf);
     yarnClient.start();
-    return yarnClient.getApplicationReport(yarnAppId).getTrackingUrl();
+    return yarnClient.getApplicationReport(yarnAppId);
   }
 
   public static void cleanupStagingDirInternal(ClusterClient clusterClient) {
@@ -77,6 +97,7 @@ public class HadoopUtils {
     }
     Path destPath = new Path(tmpDir.getAbsolutePath() + "/" + sourcePath.getName());
     fs.copyToLocalFile(sourcePath, destPath);
+    LOGGER.info("Downloaded jar from {} to {}", jarOnHdfs, destPath);
     return new File(destPath.toString()).getAbsolutePath();
   }
 }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
index d29221f..1704deb 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -51,17 +51,19 @@ public class JobManager {
           new ConcurrentHashMap<>();
   private FlinkZeppelinContext z;
   private String flinkWebUrl;
-  private String replacedFlinkWebUrl;
+  private String displayedFlinkWebUrl;
   private Properties properties;
 
   public JobManager(FlinkZeppelinContext z,
                     String flinkWebUrl,
-                    String replacedFlinkWebUrl,
+                    String displayedFlinkWebUrl,
                     Properties properties) {
     this.z = z;
     this.flinkWebUrl = flinkWebUrl;
-    this.replacedFlinkWebUrl = replacedFlinkWebUrl;
+    this.displayedFlinkWebUrl = displayedFlinkWebUrl;
     this.properties = properties;
+    LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}",
+            flinkWebUrl, displayedFlinkWebUrl);
   }
 
   public void addJob(InterpreterContext context, JobClient jobClient) {
@@ -95,12 +97,7 @@ public class JobManager {
   public void sendFlinkJobUrl(InterpreterContext context) {
     JobClient jobClient = jobs.get(context.getParagraphId());
     if (jobClient != null) {
-      String jobUrl = null;
-      if (replacedFlinkWebUrl != null) {
-        jobUrl = replacedFlinkWebUrl + "#/job/" + jobClient.getJobID();
-      } else {
-        jobUrl = flinkWebUrl + "#/job/" + jobClient.getJobID();
-      }
+      String jobUrl = displayedFlinkWebUrl + "#/job/" + jobClient.getJobID();
       Map<String, String> infos = new HashMap<>();
       infos.put("jobUrl", jobUrl);
       infos.put("label", "FLINK JOB");
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
new file mode 100644
index 0000000..2cdfa9e
--- /dev/null
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+/**
+ * ExecutionEnvironment used for yarn application mode.
+ * Need to add jars of scala shell before submitting jobs.
+ */
+public class YarnApplicationExecutionEnvironment extends ExecutionEnvironment {
+
+  private FlinkILoop flinkILoop;
+  private FlinkScalaInterpreter flinkScalaInterpreter;
+
+  public YarnApplicationExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+                                             Configuration configuration,
+                                             ClassLoader userClassloader,
+                                             FlinkILoop flinkILoop,
+                                             FlinkScalaInterpreter flinkScalaInterpreter) {
+    super(executorServiceLoader,configuration,userClassloader);
+    this.flinkILoop = flinkILoop;
+    this.flinkScalaInterpreter = flinkScalaInterpreter;
+  }
+
+  @Override
+  public JobClient executeAsync(String jobName) throws Exception {
+    updateDependencies();
+    return super.executeAsync(jobName);
+  }
+
+  @Override
+  public JobExecutionResult execute() throws Exception {
+    updateDependencies();
+    return super.execute();
+  }
+
+  private void updateDependencies() throws Exception {
+    final Configuration configuration = getConfiguration();
+    checkState(
+            configuration.getBoolean(DeploymentOptions.ATTACHED),
+            "Only ATTACHED mode is supported by the scala shell.");
+
+    final List<URL> updatedJarFiles = getUpdatedJarFiles();
+    ConfigUtils.encodeCollectionToConfig(
+            configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+  }
+
+  private List<URL> getUpdatedJarFiles() throws MalformedURLException {
+    final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
+    final List<URL> allJarFiles = new ArrayList<>();
+    allJarFiles.add(jarUrl);
+    for (String jar : flinkScalaInterpreter.getUserJars()) {
+      allJarFiles.add(new File(jar).toURI().toURL());
+    }
+    return allJarFiles;
+  }
+}
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
new file mode 100644
index 0000000..f7f3dcb
--- /dev/null
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.ConfigUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+/**
+ * StreamExecutionEnvironment used for yarn application mode.
+ * Need to add jars of scala shell before submitting jobs.
+ */
+public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YarnApplicationStreamEnvironment.class);
+
+  private FlinkILoop flinkILoop;
+  private FlinkScalaInterpreter flinkScalaInterpreter;
+
+  public YarnApplicationStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+                                          Configuration configuration,
+                                          ClassLoader userClassloader,
+                                          FlinkILoop flinkILoop,
+                                          FlinkScalaInterpreter flinkScalaInterpreter) {
+    super(executorServiceLoader,configuration,userClassloader);
+    this.flinkILoop = flinkILoop;
+    this.flinkScalaInterpreter = flinkScalaInterpreter;
+  }
+
+  @Override
+  public JobExecutionResult execute() throws Exception {
+    updateDependencies();
+    return super.execute();
+  }
+
+  @Override
+  public JobClient executeAsync(String jobName) throws Exception {
+    updateDependencies();
+    return super.executeAsync(jobName);
+  }
+
+  private void updateDependencies() throws Exception {
+    final Configuration configuration = getConfiguration();
+    checkState(
+            configuration.getBoolean(DeploymentOptions.ATTACHED),
+            "Only ATTACHED mode is supported by the scala shell.");
+
+    final List<URL> updatedJarFiles = getUpdatedJarFiles();
+    ConfigUtils.encodeCollectionToConfig(
+            configuration, PipelineOptions.JARS, updatedJarFiles, URL::toString);
+  }
+
+  private List<URL> getUpdatedJarFiles() throws MalformedURLException {
+    final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
+    final List<URL> allJarFiles = new ArrayList<>();
+    allJarFiles.add(jarUrl);
+    for (String jar : flinkScalaInterpreter.getUserJars()) {
+      allJarFiles.add(new File(jar).toURI().toURL());
+    }
+    return allJarFiles;
+  }
+}
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala
index ee7c58c..5a96e32 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkILoop.scala
@@ -20,7 +20,13 @@ package org.apache.zeppelin.flink
 
 import java.io.{BufferedReader, File}
 
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment, ExecutionEnvironment => JExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
+import org.apache.zeppelin.flink.FlinkShell.ExecutionMode
 
 import scala.tools.nsc.interpreter._
 
@@ -29,17 +35,83 @@ class FlinkILoop(
     override val flinkConfig: Configuration,
     override val externalJars: Option[Array[String]],
     in0: Option[BufferedReader],
-    out0: JPrintWriter) extends org.apache.flink.api.scala.FlinkILoop(flinkConfig, externalJars, in0, out0) {
+    out0: JPrintWriter,
+    mode: ExecutionMode.Value,
+    jenv: JExecutionEnvironment,
+    jsenv: JStreamExecutionEnvironment,
+    flinkScalaInterpreter: FlinkScalaInterpreter) extends org.apache.flink.api.scala.FlinkILoop(flinkConfig, externalJars, in0, out0) {
+
 
   override def writeFilesToDisk(): File = {
-    // create tmpDirBase again in case it is deleted by system, because it is in the system temp folder.
-    val field = getClass.getSuperclass.getDeclaredField("tmpDirBase")
-    field.setAccessible(true)
-    val tmpDir = field.get(this).asInstanceOf[File]
-    if (!tmpDir.exists()) {
-      tmpDir.mkdir()
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      // write jars to current working directory in yarn application mode.
+      val tmpDirShellField = classOf[org.apache.flink.api.scala.FlinkILoop]
+        .getDeclaredField("org$apache$flink$api$scala$FlinkILoop$$tmpDirShell")
+      tmpDirShellField.setAccessible(true)
+      tmpDirShellField.set(this, new File("scala_shell_commands"));
+
+      val tmpJarShellField = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField("tmpJarShell")
+      tmpJarShellField.setAccessible(true)
+      tmpJarShellField.set(this, new File("scala_shell_commands.jar"));
+    } else {
+      // create tmpDirBase again in case it is deleted by system, because it is in the system temp folder.
+      val field = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField("tmpDirBase")
+      field.setAccessible(true)
+      val tmpDir = field.get(this).asInstanceOf[File]
+      if (!tmpDir.exists()) {
+        tmpDir.mkdir()
+      }
     }
+
     super.writeFilesToDisk()
   }
-}
 
+  override val (
+    scalaBenv: ExecutionEnvironment,
+    scalaSenv: StreamExecutionEnvironment
+    ) = {
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      // For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
+      // by flink itself, we here just try get them via reflection and reconstruct them.
+      val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment(
+        getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
+        getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
+        getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
+        this,
+        flinkScalaInterpreter
+      ))
+      val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment(
+        getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
+        getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
+        getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
+        this,
+        flinkScalaInterpreter
+      ))
+      (scalaBenv, scalaSenv)
+    } else {
+      val scalaBenv = new ExecutionEnvironment(
+        getSuperFlinkILoopField("remoteBenv").asInstanceOf[ScalaShellEnvironment])
+      val scalaSenv = new StreamExecutionEnvironment(
+        getSuperFlinkILoopField("remoteSenv").asInstanceOf[ScalaShellStreamEnvironment])
+      (scalaBenv, scalaSenv)
+    }
+  }
+
+  private def getExecutionEnvironmentField(obj: Object, name: String): Object = {
+    val field = classOf[JExecutionEnvironment].getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  private def getStreamExecutionEnvironmentField(obj: Object, name: String): Object = {
+    val field = classOf[JStreamExecutionEnvironment].getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(obj)
+  }
+
+  private def getSuperFlinkILoopField(name: String): Object = {
+    val field = classOf[org.apache.flink.api.scala.FlinkILoop].getDeclaredField(name)
+    field.setAccessible(true)
+    field.get(this)
+  }
+}
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 25bfcfc..043f3d4 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -53,7 +53,7 @@ import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
 import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterException, InterpreterHookRegistry, InterpreterResult}
 import org.slf4j.{Logger, LoggerFactory}
 
-import scala.collection.{JavaConversions, JavaConverters}
+import scala.collection.JavaConversions
 import scala.collection.JavaConverters._
 import scala.tools.nsc.Settings
 import scala.tools.nsc.interpreter.Completion.ScalaCompleter
@@ -91,23 +91,31 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   // PyFlink depends on java version of TableEnvironment,
   // so need to create java version of TableEnvironment
+  // java version of blink TableEnvironment
   private var java_btenv: TableEnvironment = _
   private var java_stenv: TableEnvironment = _
 
+  // java version of flink TableEnvironment
   private var java_btenv_2: TableEnvironment = _
   private var java_stenv_2: TableEnvironment = _
 
   private var z: FlinkZeppelinContext = _
   private var flinkVersion: FlinkVersion = _
   private var flinkShims: FlinkShims = _
+  // used for calling jm rest api
   private var jmWebUrl: String = _
-  private var replacedJMWebUrl: String = _
+  // used for displaying on zeppelin ui
+  private var displayedJMWebUrl: String = _
   private var jobManager: JobManager = _
   private var defaultParallelism = 1
   private var defaultSqlParallelism = 1
+
+  // flink.execution.jars + flink.execution.jars + flink.udf.jars
   private var userJars: Seq[String] = _
+  // flink.udf.jars
   private var userUdfJars: Seq[String] = _
 
+
   def open(): Unit = {
     val config = initFlinkConfig()
     createFlinkILoop(config)
@@ -120,8 +128,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     val modifiers = new java.util.ArrayList[String]()
     modifiers.add("@transient")
     this.bind("z", z.getClass().getCanonicalName(), z, modifiers);
-
-    this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl, properties)
+    this.jobManager = new JobManager(this.z, jmWebUrl, displayedJMWebUrl, properties)
 
     // register JobListener
     val jobListener = new FlinkJobListener()
@@ -138,28 +145,60 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
     // load udf jar
     this.userUdfJars.foreach(jar => loadUDFJar(jar))
+
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      // have to call senv.execute method before running any user code, otherwise yarn application mode
+      // will cause ClassNotFound issue. Needs to do more investigation. TODO(zjffdu)
+      val initCode =
+        """
+          |val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
+          |data.flatMap(line => line.split("\\s"))
+          |  .map(w => (w, 1))
+          |  .keyBy(0)
+          |  .sum(1)
+          |  .print
+          |
+          |senv.execute()
+          |""".stripMargin
+
+      interpret(initCode, InterpreterContext.get())
+      InterpreterContext.get().out.clear()
+    }
   }
 
   private def initFlinkConfig(): Config = {
-    val flinkHome = sys.env.getOrElse("FLINK_HOME", "")
-    val flinkConfDir = sys.env.getOrElse("FLINK_CONF_DIR", "")
+
+    this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
+    LOGGER.info("Using flink: " + flinkVersion)
+    this.flinkShims = FlinkShims.getInstance(flinkVersion, properties)
+
+    var flinkHome = sys.env.getOrElse("FLINK_HOME", "")
+    var flinkConfDir = sys.env.getOrElse("FLINK_CONF_DIR", "")
     val hadoopConfDir = sys.env.getOrElse("HADOOP_CONF_DIR", "")
     val yarnConfDir = sys.env.getOrElse("YARN_CONF_DIR", "")
-    val hiveConfDir = sys.env.getOrElse("HIVE_CONF_DIR", "")
+    var hiveConfDir = sys.env.getOrElse("HIVE_CONF_DIR", "")
+
+    mode = ExecutionMode.withName(
+      properties.getProperty("flink.execution.mode", "LOCAL")
+        .replace("-", "_")
+        .toUpperCase)
+    if (mode == ExecutionMode.YARN_APPLICATION) {
+      if (flinkVersion.isFlink110) {
+        throw new Exception("yarn-application mode is only supported after Flink 1.11")
+      }
+      // use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
+      val workingDirectory = new File(".").getAbsolutePath
+      flinkHome = workingDirectory
+      flinkConfDir = workingDirectory
+      hiveConfDir = workingDirectory
+    }
     LOGGER.info("FLINK_HOME: " + flinkHome)
     LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir)
     LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir)
     LOGGER.info("YARN_CONF_DIR: " + yarnConfDir)
     LOGGER.info("HIVE_CONF_DIR: " + hiveConfDir)
 
-    this.flinkVersion = new FlinkVersion(EnvironmentInformation.getVersion)
-    LOGGER.info("Using flink: " + flinkVersion)
-    this.flinkShims = FlinkShims.getInstance(flinkVersion, properties)
-
     this.configuration = GlobalConfiguration.loadConfiguration(flinkConfDir)
-
-    mode = ExecutionMode.withName(
-      properties.getProperty("flink.execution.mode", "LOCAL").toUpperCase)
     var config = Config(executionMode = mode)
     val jmMemory = properties.getProperty("flink.jm.memory", "1024")
     config = config.copy(yarnConfig =
@@ -261,7 +300,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
               // for some cloud vender, the yarn address may be mapped to some other address.
               val yarnAddress = properties.getProperty("flink.webui.yarn.address")
               if (!StringUtils.isBlank(yarnAddress)) {
-                this.replacedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
+                this.displayedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
               }
             } else {
               this.jmWebUrl = clusterClient.getWebInterfaceURL
@@ -271,8 +310,20 @@ class FlinkScalaInterpreter(val properties: Properties) {
           }
         case None =>
           // remote mode
-          LOGGER.info("Use FlinkCluster in remote mode")
-          this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
+          if (mode == ExecutionMode.YARN_APPLICATION) {
+            val yarnAppId = System.getenv("_APP_ID");
+            LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
+            this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
+            this.displayedJMWebUrl = HadoopUtils.getYarnAppTrackingUrl(yarnAppId)
+          } else {
+            LOGGER.info("Use FlinkCluster in remote mode")
+            this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
+          }
+      }
+
+      if (this.displayedJMWebUrl != null) {
+        // use jmWebUrl as displayedJMWebUrl if it is not set
+        this.displayedJMWebUrl = this.jmWebUrl
       }
 
       LOGGER.info(s"\nConnecting to Flink cluster: " + this.jmWebUrl)
@@ -286,7 +337,10 @@ class FlinkScalaInterpreter(val properties: Properties) {
         // use FlinkClassLoader to initialize FlinkILoop, otherwise TableFactoryService could not find
         // the TableFactory properly
         Thread.currentThread().setContextClassLoader(getFlinkClassLoader)
-        val repl = new FlinkILoop(configuration, config.externalJars, None, replOut)
+        val repl = new FlinkILoop(configuration, config.externalJars, None, replOut, mode,
+          JExecutionEnvironment.getExecutionEnvironment,
+          JStreamExecutionEnvironment.getExecutionEnvironment,
+          this)
         (repl, cluster)
       } catch {
         case e: Exception =>
@@ -551,7 +605,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
                            context: InterpreterContext): java.util.List[InterpreterCompletion] = {
     val completions = scalaCompleter.complete(buf.substring(0, cursor), cursor).candidates
       .map(e => new InterpreterCompletion(e, e, null))
-    scala.collection.JavaConversions.seqAsJavaList(completions)
+    JavaConversions.seqAsJavaList(completions)
   }
 
   protected def callMethod(obj: Object, name: String): Object = {
@@ -566,7 +620,6 @@ class FlinkScalaInterpreter(val properties: Properties) {
     method.invoke(obj, parameters: _ *)
   }
 
-
   protected def getField(obj: Object, name: String): Object = {
     val field = obj.getClass.getField(name)
     field.setAccessible(true)
@@ -825,7 +878,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
   def getJobManager = this.jobManager
 
   def getFlinkScalaShellLoader: ClassLoader = {
-    val userCodeJarFile = this.flinkILoop.writeFilesToDisk();
+    val userCodeJarFile = this.flinkILoop.writeFilesToDisk()
     new URLClassLoader(Array(userCodeJarFile.toURL) ++ userJars.map(e => new File(e).toURL))
   }
 
@@ -891,6 +944,8 @@ class FlinkScalaInterpreter(val properties: Properties) {
     yarnAddress + remaining
   }
 
+  def getUserJars:java.util.List[String] = JavaConversions.seqAsJavaList(userJars)
+
   private def getConfigurationOfStreamExecutionEnv(): Configuration = {
     val getConfigurationMethod = classOf[JStreamExecutionEnvironment].getDeclaredMethod("getConfiguration")
     getConfigurationMethod.setAccessible(true)
diff --git a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
index e4bcf1b..f26d6b1 100644
--- a/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
+++ b/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkShell.scala
@@ -38,7 +38,7 @@ import scala.collection.mutable.ArrayBuffer
 object FlinkShell {
 
   object ExecutionMode extends Enumeration {
-    val UNDEFINED, LOCAL, REMOTE, YARN = Value
+    val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION = Value
   }
 
   /** Configuration object */
@@ -72,7 +72,8 @@ object FlinkShell {
     config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
   }
 
-  @Internal def fetchConnectionInfo(
+  @Internal
+  def fetchConnectionInfo(
       config: Config,
       flinkConfig: Configuration,
       flinkShims: FlinkShims): (Configuration, Option[ClusterClient[_]]) = {
@@ -81,9 +82,10 @@ object FlinkShell {
       case ExecutionMode.LOCAL => createLocalClusterAndConfig(flinkConfig)
       case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
       case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
+      case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
       case ExecutionMode.UNDEFINED => // Wrong input
         throw new IllegalArgumentException("please specify execution mode:\n" +
-          "[local | remote <host> <port> | yarn]")
+          "[local | remote <host> <port> | yarn | yarn-application ]")
     }
   }
 
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index 1e0f61e..21b1f66 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -160,4 +160,26 @@ public abstract class FlinkIntegrationTest {
 
     interpreterSettingManager.close();
   }
+
+  @Test
+  public void testYarnApplicationMode() throws IOException, InterpreterException, YarnException {
+    if (flinkVersion.startsWith("1.10")) {
+      LOGGER.info("Skip yarn application mode test for flink 1.10");
+      return;
+    }
+    InterpreterSetting flinkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("flink");
+    flinkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
+    flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
+    flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
+    flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
+    flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn-application");
+    testInterpreterBasics();
+
+    // 1 yarn application launched
+    GetApplicationsRequest request = GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING));
+    GetApplicationsResponse response = hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(request);
+    assertEquals(1, response.getApplicationList().size());
+
+    interpreterSettingManager.close();
+  }
 }
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 32259c5..f18e569 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -17,16 +17,25 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import com.google.common.base.CharMatcher;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreterLauncher.class);
+
+
   public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
     super(zConf, recoveryStorage);
   }
@@ -35,6 +44,26 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
   public Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context)
           throws IOException {
     Map<String, String> envs = super.buildEnvFromProperties(context);
+
+    String flinkHome = updateEnvsForFlinkHome(envs, context);
+
+    if (!envs.containsKey("FLINK_CONF_DIR")) {
+      envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
+    }
+    envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
+    envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
+
+    // yarn application mode specific logic
+    if ("yarn-application".equalsIgnoreCase(
+            context.getProperties().getProperty("flink.execution.mode"))) {
+      updateEnvsForYarnApplicationMode(envs, context);
+    }
+
+    return envs;
+  }
+
+  private String updateEnvsForFlinkHome(Map<String, String> envs,
+                                      InterpreterLaunchContext context) throws IOException {
     String flinkHome = context.getProperties().getProperty("FLINK_HOME");
     if (StringUtils.isBlank(flinkHome)) {
       flinkHome = System.getenv("FLINK_HOME");
@@ -50,11 +79,65 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
       throw new IOException(String.format("FLINK_HOME '%s' is a file, but should be directory",
               flinkHome));
     }
-    if (!envs.containsKey("FLINK_CONF_DIR")) {
-      envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
+    return flinkHome;
+  }
+
+  private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
+                                                InterpreterLaunchContext context) {
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+
+    StringBuilder flinkYarnApplicationConfBuilder = new StringBuilder();
+    // set yarn.ship-files
+    List<String> yarnShipFiles = getYarnShipFiles(context);
+    if (!yarnShipFiles.isEmpty()) {
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.ship-files=" + yarnShipFiles.stream().collect(Collectors.joining(",")));
     }
-    envs.put("FLINK_LIB_DIR", flinkHome + "/lib");
-    envs.put("FLINK_PLUGINS_DIR", flinkHome + "/plugins");
-    return envs;
+
+    // set yarn.application.name
+    String yarnAppName = context.getProperties().getProperty("flink.yarn.appName");
+    if (StringUtils.isNotBlank(yarnAppName)) {
+      // flink run command can not contains whitespace, so replace it with _
+      flinkYarnApplicationConfBuilder.append(
+              " -D yarn.application.name=" + yarnAppName.replaceAll(" ", "_") + "");
+    }
+
+    // add other yarn and python configuration.
+    for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) {
+      String key = entry.getKey().toString();
+      String value = entry.getValue().toString();
+      if (!key.equalsIgnoreCase("yarn.ship-files") &&
+              !key.equalsIgnoreCase("flink.yarn.appName")) {
+        if (CharMatcher.whitespace().matchesAnyOf(value)) {
+          LOGGER.warn("flink configuration key {} is skipped because it contains white space",
+                  key);
+        } else {
+          flinkYarnApplicationConfBuilder.append(" -D " + key + "=" + value);
+        }
+      }
+    }
+    envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkYarnApplicationConfBuilder.toString());
+  }
+
+  private List<String> getYarnShipFiles(InterpreterLaunchContext context) {
+    // Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled
+    // and HIVE_CONF_DIR is specified
+    List<String> yarnShipFiles = new ArrayList<>();
+    String hiveConfDirProperty = context.getProperties().getProperty("HIVE_CONF_DIR");
+    if (StringUtils.isNotBlank(hiveConfDirProperty) &&
+            Boolean.parseBoolean(context.getProperties()
+                    .getProperty("zeppelin.flink.enableHive", "false"))) {
+      File hiveSiteFile = new File(hiveConfDirProperty, "hive-site.xml");
+      if (hiveSiteFile.isFile() && hiveSiteFile.exists()) {
+        yarnShipFiles.add(hiveSiteFile.getAbsolutePath());
+      } else {
+        LOGGER.warn("Hive site file: {} doesn't exist or is not a directory", hiveSiteFile);
+      }
+    }
+    if (context.getProperties().containsKey("yarn.ship-files")) {
+      yarnShipFiles.add(context.getProperties().getProperty("yarn.ship-files"));
+    }
+
+    return yarnShipFiles;
   }
 }
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index 824cfee..fcabaab 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -209,11 +209,18 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
     @Override
     public void onProcessComplete(int exitValue) {
       LOGGER.warn("Process is exited with exit value {}", exitValue);
-      if (getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false").equals("false")) {
+      if (isSparkYarnClusterMode()) {
         // don't call notify in yarn-cluster mode
         synchronized (this) {
           notifyAll();
         }
+      } else if (isFlinkYarnApplicationMode() && exitValue == 0) {
+        // Don't update transition state when flink launcher process exist
+        // in yarn application mode.
+        synchronized (this) {
+          notifyAll();
+        }
+        return;
       }
       // For yarn-cluster mode, client process will exit with exit value 0
       // after submitting spark app. So don't move to TERMINATED state when exitValue
@@ -225,6 +232,16 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
       }
     }
 
+    private boolean isSparkYarnClusterMode() {
+      return Boolean.parseBoolean(
+              getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false"));
+    }
+
+    private boolean isFlinkYarnApplicationMode() {
+      return Boolean.parseBoolean(
+              getEnv().getOrDefault("ZEPPELIN_FLINK_YARN_APPLICATION", "false"));
+    }
+
     @Override
     public void onProcessFailed(ExecuteException e) {
       super.onProcessFailed(e);