You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/10/11 12:30:08 UTC
[zeppelin] branch master updated: [ZEPPELIN-5352] Support flink in
k8s mode
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 28bc972 [ZEPPELIN-5352] Support flink in k8s mode
28bc972 is described below
commit 28bc9722700905777f2304a843a126c302d2023e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Oct 8 20:57:29 2021 +0800
[ZEPPELIN-5352] Support flink in k8s mode
### What is this PR for?
Flink support k8s native mode from flink 1.11, this PR is to support k8s-application mode. The implementation is very similar as yarn-application mode.
### What type of PR is it?
[Feature]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5352
### How should this be tested?
* Ci pass and manually tested
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4116 from zjffdu/ZEPPELIN-5352 and squashes the following commits:
983a3793da [Jeff Zhang] address comment
11d0c60af1 [Jeff Zhang] [ZEPPELIN-5352]. Support flink in k8s mode
---
bin/interpreter.sh | 6 +-
...va => ApplicationModeExecutionEnvironment.java} | 6 +-
....java => ApplicationModeStreamEnvironment.java} | 8 +--
.../zeppelin/flink/FlinkScalaInterpreter.scala | 32 ++++++++-
.../zeppelin/flink/internal/FlinkILoop.scala | 9 ++-
.../zeppelin/flink/internal/FlinkShell.scala | 20 +++++-
.../zeppelin/integration/FlinkIntegrationTest.java | 2 +-
.../remote/RemoteInterpreterServer.java | 19 +++--
.../launcher/FlinkInterpreterLauncher.java | 83 +++++++++++++++++-----
.../zeppelin/interpreter/InterpreterSetting.java | 4 ++
.../remote/ExecRemoteInterpreterProcess.java | 9 ++-
11 files changed, 148 insertions(+), 50 deletions(-)
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index c75a299..6d9c048 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -304,9 +304,9 @@ if [[ -n "${SPARK_SUBMIT}" ]]; then
else
INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
fi
-elif [[ "${ZEPPELIN_FLINK_YARN_APPLICATION}" == "true" ]]; then
- IFS='|' read -r -a ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY <<< "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF}"
- INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "yarn-application" "${ZEPPELIN_FLINK_YARN_APPLICATION_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
+elif [[ -n "${ZEPPELIN_FLINK_APPLICATION_MODE}" ]]; then
+ IFS='|' read -r -a ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY <<< "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF}"
+ INTERPRETER_RUN_COMMAND+=("${FLINK_HOME}/bin/flink" "run-application" "-c" "${ZEPPELIN_SERVER}" "-t" "${ZEPPELIN_FLINK_APPLICATION_MODE}" "${ZEPPELIN_FLINK_APPLICATION_MODE_CONF_ARRAY[@]}" "${FLINK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")
else
IFS=' ' read -r -a JAVA_INTP_OPTS_ARRAY <<< "${JAVA_INTP_OPTS}"
IFS=' ' read -r -a ZEPPELIN_INTP_MEM_ARRAY <<< "${ZEPPELIN_INTP_MEM}"
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
similarity index 94%
rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
index 2cfd0e3..33afa03 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationExecutionEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java
@@ -37,15 +37,15 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
- * ExecutionEnvironment used for yarn application mode.
+ * ExecutionEnvironment used for application mode.
* Need to add jars of scala shell before submitting jobs.
*/
-public class YarnApplicationExecutionEnvironment extends ExecutionEnvironment {
+public class ApplicationModeExecutionEnvironment extends ExecutionEnvironment {
private FlinkILoop flinkILoop;
private FlinkScalaInterpreter flinkScalaInterpreter;
- public YarnApplicationExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+ public ApplicationModeExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userClassloader,
FlinkILoop flinkILoop,
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
similarity index 93%
rename from flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
rename to flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
index 7f2fc92..b86e556 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/YarnApplicationStreamEnvironment.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeStreamEnvironment.java
@@ -40,17 +40,17 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
- * StreamExecutionEnvironment used for yarn application mode.
+ * StreamExecutionEnvironment used for application mode.
* Need to add jars of scala shell before submitting jobs.
*/
-public class YarnApplicationStreamEnvironment extends StreamExecutionEnvironment {
+public class ApplicationModeStreamEnvironment extends StreamExecutionEnvironment {
- private static final Logger LOGGER = LoggerFactory.getLogger(YarnApplicationStreamEnvironment.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationModeStreamEnvironment.class);
private FlinkILoop flinkILoop;
private FlinkScalaInterpreter flinkScalaInterpreter;
- public YarnApplicationStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
+ public ApplicationModeStreamEnvironment(PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
ClassLoader userClassloader,
FlinkILoop flinkILoop,
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 0918084..12051dd 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -18,7 +18,7 @@
package org.apache.zeppelin.flink
-import java.io.File
+import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.nio.file.Files
import java.util.Properties
@@ -45,6 +45,7 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
import org.apache.flink.table.module.hive.HiveModule
import org.apache.flink.yarn.cli.FlinkYarnSessionCli
import org.apache.zeppelin.dep.DependencyResolver
+import org.apache.zeppelin.flink.internal.FlinkShell
import org.apache.zeppelin.flink.internal.FlinkShell._
import org.apache.zeppelin.flink.internal.FlinkILoop
import org.apache.zeppelin.interpreter.Interpreter.FormType
@@ -145,7 +146,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
// load udf jar
this.userUdfJars.foreach(jar => loadUDFJar(jar))
- if (mode == ExecutionMode.YARN_APPLICATION) {
+ if (ExecutionMode.isApplicationMode(mode)) {
// have to call senv.execute method before running any user code, otherwise yarn application mode
// will cause ClassNotFound issue. Needs to do more investigation. TODO(zjffdu)
val initCode =
@@ -185,7 +186,7 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
properties.getProperty("flink.execution.mode", "LOCAL")
.replace("-", "_")
.toUpperCase)
- if (mode == ExecutionMode.YARN_APPLICATION) {
+ if (ExecutionMode.isYarnAppicationMode(mode)) {
if (flinkVersion.isFlink110) {
throw new Exception("yarn-application mode is only supported after Flink 1.11")
}
@@ -195,6 +196,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
flinkConfDir = workingDirectory
hiveConfDir = workingDirectory
}
+ if (ExecutionMode.isK8sApplicationMode(mode)) {
+ if (flinkVersion.isFlink110) {
+ throw new Exception("application mode is only supported after Flink 1.11")
+ }
+ // use current pod working directory as FLINK_HOME
+ val workingDirectory = new File(".").getAbsolutePath
+ flinkHome = workingDirectory
+ flinkConfDir = workingDirectory + "/conf"
+ hiveConfDir = workingDirectory + "/conf"
+ }
+
LOGGER.info("FLINK_HOME: " + flinkHome)
LOGGER.info("FLINK_CONF_DIR: " + flinkConfDir)
LOGGER.info("HADOOP_CONF_DIR: " + hadoopConfDir)
@@ -234,7 +246,17 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
.copy(queue = Some(queue))))
this.userUdfJars = getUserUdfJars()
+
this.userJars = getUserJarsExceptUdfJars ++ this.userUdfJars
+ if (ExecutionMode.isK8sApplicationMode(mode)) {
+ var flinkAppJar = properties.getProperty("flink.app.jar")
+ if (flinkAppJar != null && flinkAppJar.startsWith("local://")) {
+ flinkAppJar = flinkAppJar.substring(8)
+ this.userJars = this.userJars :+ flinkAppJar
+ } else {
+ throw new IOException("flink.app.jar is not set or invalid, flink.app.jar: " + flinkAppJar)
+ }
+ }
LOGGER.info("UserJars: " + userJars.mkString(","))
config = config.copy(externalJars = Some(userJars.toArray))
LOGGER.info("Config: " + config)
@@ -317,6 +339,10 @@ abstract class FlinkScalaInterpreter(val properties: Properties,
LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
+ } else if (ExecutionMode.isK8sApplicationMode(mode)) {
+ LOGGER.info("Use FlinkCluster in kubernetes-application mode")
+ this.jmWebUrl = "http://localhost:" + configuration.getInteger("rest.port", 8081)
+ this.displayedJMWebUrl = this.jmWebUrl
} else {
LOGGER.info("Use FlinkCluster in remote mode")
this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
index 86abeb5..1be64ab 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkILoop.scala
@@ -23,13 +23,12 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.AbstractID
import java.io.{BufferedReader, File, FileOutputStream, IOException}
-import java.net.URL
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
import org.apache.flink.api.java.{ExecutionEnvironment => JExecutionEnvironment}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.execution.PipelineExecutorServiceLoader
-import org.apache.zeppelin.flink.{FlinkScalaInterpreter, YarnApplicationExecutionEnvironment, YarnApplicationStreamEnvironment}
+import org.apache.zeppelin.flink.{ApplicationModeExecutionEnvironment, ApplicationModeStreamEnvironment, FlinkScalaInterpreter}
import FlinkShell.ExecutionMode
import scala.tools.nsc.interpreter._
@@ -71,17 +70,17 @@ class FlinkILoop(
scalaBenv: ExecutionEnvironment,
scalaSenv: StreamExecutionEnvironment
) = {
- if (mode == ExecutionMode.YARN_APPLICATION) {
+ if (ExecutionMode.isApplicationMode(mode)) {
// For yarn application mode, ExecutionEnvironment & StreamExecutionEnvironment has already been created
// by flink itself, we here just try get them via reflection and reconstruct them.
- val scalaBenv = new ExecutionEnvironment(new YarnApplicationExecutionEnvironment(
+ val scalaBenv = new ExecutionEnvironment(new ApplicationModeExecutionEnvironment(
getExecutionEnvironmentField(jenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
getExecutionEnvironmentField(jenv, "configuration").asInstanceOf[Configuration],
getExecutionEnvironmentField(jenv, "userClassloader").asInstanceOf[ClassLoader],
this,
flinkScalaInterpreter
))
- val scalaSenv = new StreamExecutionEnvironment(new YarnApplicationStreamEnvironment(
+ val scalaSenv = new StreamExecutionEnvironment(new ApplicationModeStreamEnvironment(
getStreamExecutionEnvironmentField(jsenv, "executorServiceLoader").asInstanceOf[PipelineExecutorServiceLoader],
getStreamExecutionEnvironmentField(jsenv, "configuration").asInstanceOf[Configuration],
getStreamExecutionEnvironmentField(jsenv, "userClassloader").asInstanceOf[ClassLoader],
diff --git a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
index 437d188..ab0f299 100644
--- a/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
+++ b/flink/flink-scala-parent/src/main/scala/org/apache/zeppelin/flink/internal/FlinkShell.scala
@@ -18,7 +18,7 @@
package org.apache.zeppelin.flink.internal
-import java.io.BufferedReader
+import java.io._
import org.apache.flink.annotation.Internal
import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfigurati
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor
import org.apache.zeppelin.flink.FlinkShims
+
import scala.collection.mutable.ArrayBuffer
/**
@@ -39,7 +40,19 @@ import scala.collection.mutable.ArrayBuffer
object FlinkShell {
object ExecutionMode extends Enumeration {
- val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION = Value
+ val UNDEFINED, LOCAL, REMOTE, YARN, YARN_APPLICATION, KUBERNETES_APPLICATION = Value
+
+ def isYarnAppicationMode(mode: ExecutionMode.Value): Boolean = {
+ mode == ExecutionMode.YARN_APPLICATION
+ }
+
+ def isK8sApplicationMode(mode: ExecutionMode.Value): Boolean = {
+ mode == ExecutionMode.KUBERNETES_APPLICATION
+ }
+
+ def isApplicationMode(mode: ExecutionMode.Value): Boolean = {
+ isYarnAppicationMode(mode) || isK8sApplicationMode(mode)
+ }
}
/** Configuration object */
@@ -84,9 +97,10 @@ object FlinkShell {
case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
+ case ExecutionMode.KUBERNETES_APPLICATION => (flinkConfig, None)
case ExecutionMode.UNDEFINED => // Wrong input
throw new IllegalArgumentException("please specify execution mode:\n" +
- "[local | remote <host> <port> | yarn | yarn-application ]")
+ "[local | remote <host> <port> | yarn | yarn-application | kubernetes-application]")
}
}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
index f27acbd..2dc14f8 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/FlinkIntegrationTest.java
@@ -153,7 +153,7 @@ public abstract class FlinkIntegrationTest {
flinkInterpreterSetting.setProperty("FLINK_HOME", flinkHome);
flinkInterpreterSetting.setProperty("PATH", hadoopHome + "/bin:" + System.getenv("PATH"));
flinkInterpreterSetting.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
- flinkInterpreterSetting.setProperty("flink.execution.mode", "YARN");
+ flinkInterpreterSetting.setProperty("flink.execution.mode", "yarn");
flinkInterpreterSetting.setProperty("zeppelin.flink.run.asLoginUser", "false");
testInterpreterBasics();
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index c058dd6..aff8139 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -77,7 +77,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.nio.ByteBuffer;
@@ -142,6 +141,12 @@ public class RemoteInterpreterServer extends Thread
private ScheduledExecutorService resultCleanService = Executors.newSingleThreadScheduledExecutor();
private boolean isTest;
+ // Whether calling System.exit to force shutdown interpreter process.
+ // In Flink K8s application mode, RemoteInterpreterServer#main is called via reflection by flink framework.
+ // We should not call System.exit in this scenario when RemoteInterpreterServer is stopped,
+ // Otherwise flink will think flink job is exited abnormally and will try to restart this
+ // pod (RemoteInterpreterServer)
+ private boolean isForceShutdown = true;
private ZeppelinConfiguration zConf;
// cluster manager client
@@ -321,7 +326,10 @@ public class RemoteInterpreterServer extends Thread
* should be part of the next release and solve the problem.
* We may have other threads that are not terminated successfully.
*/
- System.exit(0);
+ if (remoteInterpreterServer.isForceShutdown) {
+ LOGGER.info("Force shutting down");
+ System.exit(0);
+ }
}
// Submit interpreter process metadata information to cluster metadata
@@ -380,11 +388,14 @@ public class RemoteInterpreterServer extends Thread
replClass.getConstructor(new Class[]{Properties.class});
Interpreter interpreter = constructor.newInstance(p);
interpreter.setClassloaderUrls(new URL[]{});
- LOGGER.info("Instantiate interpreter {}", className);
+
interpreter.setInterpreterGroup(interpreterGroup);
interpreter.setUserName(userName);
interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);
+
+ this.isForceShutdown = Boolean.parseBoolean(properties.getOrDefault("zeppelin.interpreter.forceShutdown", "true"));
+ LOGGER.info("Instantiate interpreter {}, isForceShutdown: {}", className, isForceShutdown);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new InterpreterRPCException("Fail to create interpreter, cause: " + e.toString());
@@ -700,7 +711,7 @@ public class RemoteInterpreterServer extends Thread
}
}
- if (server.isServing()) {
+ if (server.isServing() && isForceShutdown) {
LOGGER.info("Force shutting down");
System.exit(1);
}
diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
index 3ff4f56..83a1f24 100644
--- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
+++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java
@@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.launcher;
import com.google.common.base.CharMatcher;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -26,18 +27,20 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringJoiner;
+import java.util.Set;
import java.util.stream.Collectors;
public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkInterpreterLauncher.class);
-
+ private static final Set<String> FLINK_EXECUTION_MODES = Sets.newHashSet(
+ "local", "remote", "yarn", "yarn-application", "kubernetes-application");
public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
super(zConf, recoveryStorage);
@@ -48,8 +51,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
throws IOException {
Map<String, String> envs = super.buildEnvFromProperties(context);
- String flinkHome = updateEnvsForFlinkHome(envs, context);
-
+ // update FLINK related environment variables
+ String flinkHome = getFlinkHome(context);
if (!envs.containsKey("FLINK_CONF_DIR")) {
envs.put("FLINK_CONF_DIR", flinkHome + "/conf");
}
@@ -59,14 +62,29 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
normalizeConfiguration(context);
String flinkExecutionMode = context.getProperties().getProperty("flink.execution.mode");
- // yarn application mode specific logic
- if ("yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
- updateEnvsForYarnApplicationMode(envs, context);
+ if (!FLINK_EXECUTION_MODES.contains(flinkExecutionMode)) {
+ throw new IOException("Not valid flink.execution.mode: " +
+ flinkExecutionMode + ", valid modes ares: " +
+ FLINK_EXECUTION_MODES.stream().collect(Collectors.joining(", ")));
+ }
+ // application mode specific logic
+ if (isApplicationMode(flinkExecutionMode)) {
+ updateEnvsForApplicationMode(flinkExecutionMode, envs, context);
}
- String flinkAppJar = chooseFlinkAppJar(flinkHome);
- LOGGER.info("Choose FLINK_APP_JAR: {}", flinkAppJar);
- envs.put("FLINK_APP_JAR", flinkAppJar);
+ if (isK8sApplicationMode(flinkExecutionMode)) {
+ String flinkAppJar = context.getProperties().getProperty("flink.app.jar");
+ if (StringUtils.isBlank(flinkAppJar)) {
+ throw new IOException("flink.app.jar is not specified for kubernetes-application mode");
+ }
+ envs.put("FLINK_APP_JAR", flinkAppJar);
+ LOGGER.info("K8s application's FLINK_APP_JAR : " + flinkAppJar);
+ context.getProperties().put("zeppelin.interpreter.forceShutdown", "false");
+ } else {
+ String flinkAppJar = chooseFlinkAppJar(flinkHome);
+ LOGGER.info("Choose FLINK_APP_JAR for non k8s-application mode: {}", flinkAppJar);
+ envs.put("FLINK_APP_JAR", flinkAppJar);
+ }
if ("yarn".equalsIgnoreCase(flinkExecutionMode) ||
"yarn-application".equalsIgnoreCase(flinkExecutionMode)) {
@@ -148,8 +166,28 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
return flinkScalaJars.get(0).getAbsolutePath();
}
- private String updateEnvsForFlinkHome(Map<String, String> envs,
- InterpreterLaunchContext context) throws IOException {
+ private boolean isApplicationMode(String mode) {
+ return isYarnApplicationMode(mode) || isK8sApplicationMode(mode);
+ }
+
+ private boolean isYarnApplicationMode(String mode) {
+ return "yarn-application".equals(mode);
+ }
+
+ private boolean isK8sApplicationMode(String mode) {
+ return "kubernetes-application".equals(mode);
+ }
+
+ /**
+ * Get FLINK_HOME in the following orders:
+ * 1. FLINK_HOME in interpreter setting
+ * 2. FLINK_HOME in system environment variables.
+ *
+ * @param context
+ * @return
+ * @throws IOException
+ */
+ private String getFlinkHome(InterpreterLaunchContext context) throws IOException {
String flinkHome = context.getProperties().getProperty("FLINK_HOME");
if (StringUtils.isBlank(flinkHome)) {
flinkHome = System.getenv("FLINK_HOME");
@@ -168,11 +206,11 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
return flinkHome;
}
- private void updateEnvsForYarnApplicationMode(Map<String, String> envs,
- InterpreterLaunchContext context)
- throws IOException {
-
- envs.put("ZEPPELIN_FLINK_YARN_APPLICATION", "true");
+ private void updateEnvsForApplicationMode(String mode,
+ Map<String, String> envs,
+ InterpreterLaunchContext context) throws IOException {
+ // ZEPPELIN_FLINK_APPLICATION_MODE is used in interpreter.sh
+ envs.put("ZEPPELIN_FLINK_APPLICATION_MODE", mode);
StringJoiner flinkConfStringJoiner = new StringJoiner("|");
// set yarn.ship-files
@@ -190,7 +228,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
flinkConfStringJoiner.add("yarn.application.name=" + yarnAppName);
}
- // add other yarn and python configuration.
+ // add other configuration for both k8s and yarn
for (Map.Entry<Object, Object> entry : context.getProperties().entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();
@@ -205,9 +243,16 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher {
}
}
}
- envs.put("ZEPPELIN_FLINK_YARN_APPLICATION_CONF", flinkConfStringJoiner.toString());
+ envs.put("ZEPPELIN_FLINK_APPLICATION_MODE_CONF", flinkConfStringJoiner.toString());
}
+ /**
+ * Used in yarn-application mode.
+ *
+ * @param context
+ * @return
+ * @throws IOException
+ */
private List<String> getYarnShipFiles(InterpreterLaunchContext context) throws IOException {
// Extract yarn.ship-files, add hive-site.xml automatically if hive is enabled
// and HIVE_CONF_DIR is specified
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 3b0728e..bff9273 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -762,6 +762,10 @@ public class InterpreterSetting {
public String getLauncherPlugin(Properties properties) {
if (isRunningOnKubernetes()) {
+ if (group.equals("flink")) {
+ // Flink has its own implementation of k8s mode.
+ return "FlinkInterpreterLauncher";
+ }
return "K8sStandardInterpreterLauncher";
} else if (isRunningOnCluster()) {
return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME;
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
index f7b85a2..3b2142d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java
@@ -229,9 +229,9 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
synchronized (this) {
notifyAll();
}
- } else if (isFlinkYarnApplicationMode() && exitValue == 0) {
+ } else if (isFlinkApplicationMode() && exitValue == 0) {
// Don't update transition state when flink launcher process exist
- // in yarn application mode.
+ // in flink application mode.
synchronized (this) {
notifyAll();
}
@@ -252,9 +252,8 @@ public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProces
getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false"));
}
- private boolean isFlinkYarnApplicationMode() {
- return Boolean.parseBoolean(
- getEnv().getOrDefault("ZEPPELIN_FLINK_YARN_APPLICATION", "false"));
+ private boolean isFlinkApplicationMode() {
+ return getEnv().containsKey("ZEPPELIN_FLINK_APPLICATION_MODE");
}
@Override