You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/05/05 11:22:01 UTC
[incubator-linkis] branch dev-1.1.2 updated: Flexible custom Flink configuration and resources to Flink Engine (#2074)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 56c5be47c Flexible custom Flink configuration and resources to Flink Engine (#2074)
56c5be47c is described below
commit 56c5be47cfa915f8be531d30d3e5b5282f749626
Author: David hua <da...@hotmail.com>
AuthorDate: Thu May 5 19:21:55 2022 +0800
Flexible custom Flink configuration and resources to Flink Engine (#2074)
* Update the StringUtils from lang to lang3.
* Add new feature for Flink EngineConn to support kerberos and TriggerSavepointOperator.
* Optimize Flink EngineConn to support kerberos contexts.
* 1. add the support of extra flink configs.
2. fix the bug of AppMaster un-normal exit in flink application mode.
* Change the unit of flink yarn queue engine resource from 'G' to 'M'
* Define the YarnConfigOptions.SHIP_FILES to upload extended file/directory resources; Define the flink params blank placeholder;
* Optimize the flink engine code with "spotless" #2073
Co-authored-by: wushengyeyouya <69...@qq.com>
---
.../deployment/ClusterDescriptorAdapter.java | 21 ++++-
.../YarnApplicationClusterDescriptorAdapter.java | 2 +-
.../flink/client/result/ChangelogResult.java | 2 +-
.../flink/config/FlinkEnvConfiguration.scala | 11 +++
.../flink/config/FlinkResourceConfiguration.scala | 6 +-
.../flink/context/EnvironmentContext.scala | 2 +-
.../flink/executor/FlinkCodeOnceExecutor.scala | 2 +-
.../flink/executor/FlinkJarOnceExecutor.scala | 2 +-
.../flink/executor/FlinkOnceExecutor.scala | 17 ++--
.../flink/factory/FlinkEngineConnFactory.scala | 94 +++++++++++++++-------
.../launch/FlinkEngineConnLaunchBuilder.scala | 2 +-
.../flink/operator/TriggerSavepointOperator.scala | 43 ++++++++++
.../resource/FlinkEngineConnResourceFactory.scala | 6 +-
13 files changed, 159 insertions(+), 51 deletions(-)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index 38ae70b4c..c8a8090a5 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -21,7 +21,7 @@ import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;
import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -97,6 +97,25 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
bridgeClientRequest(this.executionContext, jobId, () -> clusterClient.cancel(jobId), true);
}
+ public String doSavepoint(String savepoint, String mode) throws JobExecutionException {
+ LOG.info("try to {} savepoint in path {}.", mode, savepoint);
+ Supplier<CompletableFuture<String>> function;
+ switch (mode) {
+ case "trigger":
+ function = () -> clusterClient.triggerSavepoint(jobId, savepoint);
+ break;
+ case "cancel":
+ function = () -> clusterClient.cancelWithSavepoint(jobId, savepoint);
+ break;
+ case "stop":
+ function = () -> clusterClient.stopWithSavepoint(jobId, false, savepoint);
+ break;
+ default:
+ throw new JobExecutionException("not supported savepoint operator mode " + mode);
+ }
+ return bridgeClientRequest(this.executionContext, jobId, function, false);
+ }
+
/**
* The reason of using ClusterClient instead of JobClient to retrieve a cluster is the JobClient
* can't know whether the job is finished on yarn-per-job mode.
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index e46bd70a4..0b1fc64a6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -20,7 +20,7 @@ package org.apache.linkis.engineconnplugin.flink.client.deployment;
import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
index cf1ad8ce3..160a1f8e3 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
@@ -20,7 +20,7 @@ package org.apache.linkis.engineconnplugin.flink.client.result;
import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 2551cd31f..83b504356 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.engineconnplugin.flink.client.config.entries.ExecutionE
object FlinkEnvConfiguration {
+ val SPARK_LIB_PATH = CommonVars("spark.lib.path", CommonVars("SPARK_HOME", "/appcom/Install/spark").getValue + "/jars")
val FLINK_HOME_ENV = "FLINK_HOME"
val FLINK_CONF_DIR_ENV = "FLINK_CONF_DIR"
val FLINK_VERSION = CommonVars("flink.version", "1.12.2")
@@ -64,4 +65,14 @@ object FlinkEnvConfiguration {
val FLINK_REPORTER_CLASS = CommonVars("linkis.flink.reporter.class", "")
val FLINK_REPORTER_INTERVAL = CommonVars("linkis.flink.reporter.interval", new TimeType("60s"))
+ val FLINK_EXECUTION_ATTACHED = CommonVars("linkis.flink.execution.attached", true)
+ val FLINK_CONFIG_PREFIX = "_FLINK_CONFIG_."
+
+ val FLINK_KERBEROS_ENABLE = CommonVars("linkis.flink.kerberos.enable", false)
+ val FLINK_KERBEROS_LOGIN_CONTEXTS = CommonVars("linkis.flink.kerberos.login.contexts", "Client,KafkaClient")
+ val FLINK_KERBEROS_LOGIN_KEYTAB = CommonVars("linkis.flink.kerberos.login.keytab", "")
+ val FLINK_KERBEROS_LOGIN_PRINCIPAL = CommonVars("linkis.flink.kerberos.login.principal", "")
+ val FLINK_KERBEROS_CONF_PATH = CommonVars("linkis.flink.kerberos.krb5-conf.path", "")
+
+ val FLINK_PARAMS_BLANK_PLACEHOLER = CommonVars("linkis.flink.params.placeholder.blank", "\\0x001")
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
index 31ac1663d..dc6b0692d 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
@@ -22,12 +22,12 @@ import org.apache.linkis.common.conf.CommonVars
object FlinkResourceConfiguration {
- val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 4) //Unit: G(单位为G)
+ val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 1024) //Unit: M(单位为M)
val LINKIS_FLINK_CLIENT_CORES = 1 //Fixed to 1(固定为1) CommonVars[Int]("wds.linkis.driver.cores", 1)
- val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 2) //Unit: G(单位为G)
- val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4) //Unit: G(单位为G)
+ val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 1024) //Unit: M(单位为M)
+ val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4096) //Unit: M(单位为M)
val LINKIS_FLINK_TASK_SLOTS = CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
val LINKIS_FLINK_TASK_MANAGER_CPU_CORES = CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
index 119bdd44c..bfff2f8f6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
@@ -24,7 +24,7 @@ import java.util.Objects
import com.google.common.collect.Lists
import org.apache.linkis.engineconnplugin.flink.client.config.Environment
import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisYarnClusterClientFactory
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.{Configuration, DeploymentOptionsInternal, GlobalConfiguration}
import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
index f0c6d8f32..0875a2c89 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.Future
import java.util.function.Supplier
import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.ClusterClientJobClientAdapter
import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
import org.apache.flink.table.api.{ResultKind, TableResult}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
index 20e353959..7cbc9e1f1 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
import org.apache.linkis.engineconnplugin.flink.client.deployment.YarnApplicationClusterDescriptorAdapter
import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
import scala.concurrent.duration.Duration
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
index 152aeb735..57d32eb10 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
@@ -47,8 +47,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
}.toMap
doSubmit(onceExecutorExecutionContext, options)
if(isCompleted) return
- if (null == clusterDescriptor.getClusterID)
+ if (null == clusterDescriptor.getClusterID) {
throw new ExecutorInitException("The application start failed, since yarn applicationId is null.")
+ }
setApplicationId(clusterDescriptor.getClusterID.toString)
setApplicationURL(clusterDescriptor.getWebInterfaceUrl)
info(s"Application is started, applicationId: $getApplicationId, applicationURL: $getApplicationURL.")
@@ -61,7 +62,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
val id: Long
- override def getId: String = "FlinkOnceApp_"+ id
+ def getClusterDescriptorAdapter: T = clusterDescriptor
+
+ override def getId: String = "FlinkOnceApp_" + id
protected def closeDaemon(): Unit = {
if (daemonThread != null) daemonThread.cancel(true)
@@ -78,14 +81,14 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
}
override protected def waitToRunning(): Unit = {
- if(!isCompleted) daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
+ if (!isCompleted) daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
private var lastStatus: JobStatus = JobStatus.INITIALIZING
- private var lastPrintTime = 0l
+ private var lastPrintTime = 0L
private val printInterval = math.max(FLINK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 * 60 * 1000)
private var fetchJobStatusFailedNum = 0
- override def run(): Unit = if(!isCompleted) {
- val jobStatus = Utils.tryCatch(clusterDescriptor.getJobStatus){t =>
- if(fetchJobStatusFailedNum >= FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue) {
+ override def run(): Unit = if (!isCompleted) {
+ val jobStatus = Utils.tryCatch(clusterDescriptor.getJobStatus) {t =>
+ if (fetchJobStatusFailedNum >= FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue) {
error(s"Fetch job status has failed max ${FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue} times, now stop this FlinkEngineConn.", t)
tryFailed()
close()
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index d357eeb09..485b9ed10 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -23,6 +23,12 @@ import java.util
import java.util.Collections
import com.google.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.configuration._
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconnplugin.flink.client.config.Environment
@@ -39,19 +45,21 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory,
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
import org.apache.linkis.manager.label.entity.engine._
-import org.apache.commons.lang.StringUtils
-import org.apache.flink.configuration._
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
-
-import scala.collection.convert.decorateAsScala._
+import scala.collection.JavaConverters._
class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {
override protected def createEngineConnSession(engineCreationContext: EngineCreationContext): Any = {
- val options = engineCreationContext.getOptions
+ var options = engineCreationContext.getOptions
+ // Filter the options (startUpParams)
+ options = options.asScala.mapValues{
+ case value if value.contains(FLINK_PARAMS_BLANK_PLACEHOLER.getValue) =>
+ info(s"Transform option value: [$value]")
+ value.replace(FLINK_PARAMS_BLANK_PLACEHOLER.getValue, " ")
+ case v1 => v1
+ }.toMap.asJava
+ engineCreationContext.setOptions(options)
val environmentContext = createEnvironmentContext(engineCreationContext)
val flinkEngineConnContext = createFlinkEngineConnContext(environmentContext)
val executionContext = createExecutionContext(options, environmentContext)
@@ -68,56 +76,75 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val flinkLibRemotePath = FLINK_LIB_REMOTE_PATH.getValue(options)
val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
- val shipDirsArray = FLINK_SHIP_DIRECTORIES.getValue(options).split(",")
+ var shipDirsArray = FLINK_SHIP_DIRECTORIES.getValue(options).split(",")
+ shipDirsArray = shipDirsArray match {
+ case pathArray: Array[String] =>
+ pathArray.map(dir => {
+ if (new File(dir).exists()) dir
+ else getClass.getClassLoader.getResource(dir).getPath
+ })
+ case _ => new Array[String](0)
+ }
val context = new EnvironmentContext(defaultEnv, new Configuration, hadoopConfDir, flinkConfDir, flinkHome,
flinkDistJarPath, flinkLibRemotePath, providedLibDirsArray, shipDirsArray, null)
- //Step1: environment-level configurations(第一步: 环境级别配置)
+ // Step1: environment-level configurations
val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
val yarnQueue = LINKIS_QUEUE_NAME.getValue(options)
val parallelism = FLINK_APP_DEFAULT_PARALLELISM.getValue(options)
- val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "G"
- val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "G"
+ val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "M"
+ val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "M"
val numberOfTaskSlots = LINKIS_FLINK_TASK_SLOTS.getValue(options)
- info(s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory G, taskManagerMemory = $taskManagerMemory G, numberOfTaskSlots = $numberOfTaskSlots.")
- //Step2: application-level configurations(第二步: 应用级别配置)
- //construct app-config(构建应用配置)
+ info(s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory, taskManagerMemory = $taskManagerMemory, numberOfTaskSlots = $numberOfTaskSlots.")
+ // Step2: application-level configurations
+ // construct app-config
val flinkConfig = context.getFlinkConfig
- //construct jar-dependencies(构建依赖jar包环境)
+ // construct jar-dependencies
val flinkUserLibRemotePath = FLINK_USER_LIB_REMOTE_PATH.getValue(options).split(",")
val providedLibDirList = Lists.newArrayList(flinkUserLibRemotePath.filter(StringUtils.isNotBlank): _*)
val flinkUserRemotePathList = Lists.newArrayList(flinkLibRemotePath.split(",").filter(StringUtils.isNotBlank): _*)
if (flinkUserRemotePathList != null && flinkUserRemotePathList.size() > 0) providedLibDirList.addAll(flinkUserRemotePathList)
- //if(StringUtils.isNotBlank(flinkLibRemotePath)) providedLibDirList.add(flinkLibRemotePath)
+ // if(StringUtils.isNotBlank(flinkLibRemotePath)) providedLibDirList.add(flinkLibRemotePath)
flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibDirList)
- //construct jar-dependencies(构建依赖jar包环境)
- flinkConfig.set(YarnConfigOptions.SHIP_ARCHIVES, context.getShipDirs)
+ // construct jar-dependencies
+ flinkConfig.set(YarnConfigOptions.SHIP_FILES, context.getShipDirs)
// set user classpaths
val classpaths = FLINK_APPLICATION_CLASSPATH.getValue(options)
if (StringUtils.isNotBlank(classpaths)) {
info(s"Add $classpaths to flink application classpath.")
flinkConfig.set(PipelineOptions.CLASSPATHS, util.Arrays.asList(classpaths.split(","): _*))
}
- //yarn application name(yarn 作业名称)
+ // yarn application name
flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, jobName)
- //yarn queue
+ // yarn queue
flinkConfig.set(YarnConfigOptions.APPLICATION_QUEUE, yarnQueue)
- //Configure resource/concurrency (设置:资源/并发度)
+ // Configure resource/concurrency
flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism)
flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory))
flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory))
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
- if(FLINK_REPORTER_ENABLE.getValue) {
- flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue)
- flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue.toLong))
+ // set extra configs
+ options.asScala.filter{ case(key, _) => key.startsWith(FLINK_CONFIG_PREFIX)}.foreach {
+ case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
+ }
+ // set kerberos config
+ if(FLINK_KERBEROS_ENABLE.getValue(options)) {
+ flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, FLINK_KERBEROS_LOGIN_CONTEXTS.getValue(options))
+ flinkConfig.set(SecurityOptions.KERBEROS_KRB5_PATH, FLINK_KERBEROS_CONF_PATH.getValue(options))
+ flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, FLINK_KERBEROS_LOGIN_PRINCIPAL.getValue(options))
+ flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, FLINK_KERBEROS_LOGIN_KEYTAB.getValue(options))
}
- //set savePoint(设置 savePoint)
+ if(FLINK_REPORTER_ENABLE.getValue(options)) {
+ flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue(options))
+ flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue(options).toLong))
+ }
+ // set savePoint
val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options)
if (StringUtils.isNotBlank(savePointPath)) {
val allowNonRestoredState = FLINK_APP_ALLOW_NON_RESTORED_STATUS.getValue(options).toBoolean
val savepointRestoreSettings = SavepointRestoreSettings.forPath(savePointPath, allowNonRestoredState)
SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, flinkConfig)
}
- //Configure user-entrance jar. Can be remote, but only support 1 jar(设置:用户入口jar:可以远程,只能设置1个jar)
+ // Configure user-entrance jar. Can be HDFS file, but only support 1 jar
val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
if(StringUtils.isNotBlank(flinkMainClassJar)) {
val flinkMainClassJarPath = if (new File(flinkMainClassJar).exists()) flinkMainClassJar
@@ -125,9 +152,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
info(s"Ready to use $flinkMainClassJarPath as main class jar to submit application to Yarn.")
flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJarPath))
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName)
+ flinkConfig.setBoolean(DeploymentOptions.ATTACHED, FLINK_EXECUTION_ATTACHED.getValue(options))
context.setDeploymentTarget(YarnDeploymentTarget.APPLICATION)
addApplicationLabels(engineCreationContext)
- } else if(isOnceEngineConn(engineCreationContext.getLabels())) {
+ } else if (isOnceEngineConn(engineCreationContext.getLabels())) {
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
} else {
flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
@@ -161,11 +189,13 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
val environment = environmentContext.getDeploymentTarget match {
case YarnDeploymentTarget.PER_JOB | YarnDeploymentTarget.SESSION =>
val planner = FlinkEnvConfiguration.FLINK_SQL_PLANNER.getValue(options)
- if (!ExecutionEntry.AVAILABLE_PLANNERS.contains(planner.toLowerCase))
+ if (!ExecutionEntry.AVAILABLE_PLANNERS.contains(planner.toLowerCase)) {
throw new FlinkInitFailedException("Planner must be one of these: " + String.join(", ", ExecutionEntry.AVAILABLE_PLANNERS))
+ }
val executionType = FlinkEnvConfiguration.FLINK_SQL_EXECUTION_TYPE.getValue(options)
- if (!ExecutionEntry.AVAILABLE_EXECUTION_TYPES.contains(executionType.toLowerCase))
+ if (!ExecutionEntry.AVAILABLE_EXECUTION_TYPES.contains(executionType.toLowerCase)) {
throw new FlinkInitFailedException("Execution type must be one of these: " + String.join(", ", ExecutionEntry.AVAILABLE_EXECUTION_TYPES))
+ }
val properties = new util.HashMap[String, String]
properties.put(Environment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_PLANNER, planner)
properties.put(Environment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_TYPE, executionType)
@@ -202,6 +232,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
}
checkpointConfig.setCheckpointTimeout(checkpointTimeout)
checkpointConfig.setMinPauseBetweenCheckpoints(checkpointMinPause)
+ checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+ checkpointConfig.configure(environmentContext.getFlinkConfig)
}
executionContext
}
@@ -213,7 +245,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
override protected def getEngineConnType: EngineType = EngineType.FLINK
- private val executorFactoryArray = Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory),
+ private val executorFactoryArray = Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory),
ClassUtil.getInstance(classOf[FlinkApplicationExecutorFactory], new FlinkApplicationExecutorFactory),
ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory))
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
index 5f4e44862..cf8e95f57 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
@@ -36,7 +36,7 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
override protected def getCommands(implicit engineConnBuildRequest: EngineConnBuildRequest): Array[String] = {
val properties = engineConnBuildRequest.engineConnCreationDesc.properties
- properties.put(EnvConfiguration.ENGINE_CONN_MEMORY.key, FlinkResourceConfiguration.LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "G")
+ properties.put(EnvConfiguration.ENGINE_CONN_MEMORY.key, FlinkResourceConfiguration.LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "M")
super.getCommands
}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
new file mode 100644
index 000000000..65096ab80
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager
+import org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor
+import org.apache.linkis.manager.common.operator.Operator
+
+
+class TriggerSavepointOperator extends Operator with Logging {
+
+ override def getNames: Array[String] = Array("doSavepoint")
+
+ override def apply(implicit parameters: Map[String, Any]): Map[String, Any] = {
+ val savepoint = getAsThrow("savepointPath")
+ val mode = getAsThrow("mode")
+ info(s"try to $mode savepoint with path $savepoint.")
+ OnceExecutorManager.getInstance.getReportExecutor match {
+ case flinkExecutor: FlinkOnceExecutor[_] =>
+ val writtenSavepoint = flinkExecutor.getClusterDescriptorAdapter.doSavepoint(savepoint, mode)
+ Map("writtenSavepoint" -> writtenSavepoint)
+ case executor => throw new JobExecutionException("Not support to do savepoint for " + executor.getClass.getSimpleName)
+ }
+ }
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
index 38bf3ac92..129c6d21d 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
@@ -31,11 +31,11 @@ class FlinkEngineConnResourceFactory extends AbstractEngineResourceFactory {
properties.put(FLINK_APP_DEFAULT_PARALLELISM.key, String.valueOf(containers * LINKIS_FLINK_TASK_SLOTS.getValue(properties)))
containers
} else math.round(FLINK_APP_DEFAULT_PARALLELISM.getValue(properties) * 1.0f / LINKIS_FLINK_TASK_SLOTS.getValue(properties))
- val yarnMemory = ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "G") +
- ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(properties) + "G")
+ val yarnMemory = ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "M") +
+ ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(properties) + "M")
val yarnCores = LINKIS_FLINK_TASK_MANAGER_CPU_CORES.getValue(properties) * containers + 1
new DriverAndYarnResource(
- new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "G"),
+ new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "M"),
LINKIS_FLINK_CLIENT_CORES,
1),
new YarnResource(yarnMemory, yarnCores, 0, LINKIS_QUEUE_NAME.getValue(properties))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org