You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/05/05 11:22:01 UTC

[incubator-linkis] branch dev-1.1.2 updated: Flexible custom Flink configuration and resources to Flink Engine (#2074)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 56c5be47c Flexible custom Flink configuration and resources to Flink Engine (#2074)
56c5be47c is described below

commit 56c5be47cfa915f8be531d30d3e5b5282f749626
Author: David hua <da...@hotmail.com>
AuthorDate: Thu May 5 19:21:55 2022 +0800

    Flexible custom Flink configuration and resources to Flink Engine (#2074)
    
    * Update the StringUtils from lang to lang3.
    
    * Add new feature for Flink EngineConn to support kerberos and TriggerSavepointOperator.
    
    * Optimize Flink EngineConn to support kerberos contexts.
    
    * 1. add the support of extra flink configs.
    2. fix the bug of AppMaster un-normal exit in flink application mode.
    
    * Change the unit of flink yarn queue engine resource from 'G' to 'M'
    
    * Define the YarnConfigOptions.SHIP_FILES to upload extended file/directory resources; Define the flink params blank placeholder;
    
    * Optimize the flink engine code with "spotless" #2073
    
    Co-authored-by: wushengyeyouya <69...@qq.com>
---
 .../deployment/ClusterDescriptorAdapter.java       | 21 ++++-
 .../YarnApplicationClusterDescriptorAdapter.java   |  2 +-
 .../flink/client/result/ChangelogResult.java       |  2 +-
 .../flink/config/FlinkEnvConfiguration.scala       | 11 +++
 .../flink/config/FlinkResourceConfiguration.scala  |  6 +-
 .../flink/context/EnvironmentContext.scala         |  2 +-
 .../flink/executor/FlinkCodeOnceExecutor.scala     |  2 +-
 .../flink/executor/FlinkJarOnceExecutor.scala      |  2 +-
 .../flink/executor/FlinkOnceExecutor.scala         | 17 ++--
 .../flink/factory/FlinkEngineConnFactory.scala     | 94 +++++++++++++++-------
 .../launch/FlinkEngineConnLaunchBuilder.scala      |  2 +-
 .../flink/operator/TriggerSavepointOperator.scala  | 43 ++++++++++
 .../resource/FlinkEngineConnResourceFactory.scala  |  6 +-
 13 files changed, 159 insertions(+), 51 deletions(-)

diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
index 38ae70b4c..c8a8090a5 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/ClusterDescriptorAdapter.java
@@ -21,7 +21,7 @@ import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
 import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration;
 import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -97,6 +97,25 @@ public abstract class ClusterDescriptorAdapter implements Closeable {
         bridgeClientRequest(this.executionContext, jobId, () -> clusterClient.cancel(jobId), true);
     }
 
+    public String doSavepoint(String savepoint, String mode) throws JobExecutionException {
+        LOG.info("try to {} savepoint in path {}.", mode, savepoint);
+        Supplier<CompletableFuture<String>> function;
+        switch (mode) {
+            case "trigger":
+                function = () -> clusterClient.triggerSavepoint(jobId, savepoint);
+                break;
+            case "cancel":
+                function = () -> clusterClient.cancelWithSavepoint(jobId, savepoint);
+                break;
+            case "stop":
+                function = () -> clusterClient.stopWithSavepoint(jobId, false, savepoint);
+                break;
+            default:
+                throw new JobExecutionException("not supported savepoint operator mode " + mode);
+        }
+        return bridgeClientRequest(this.executionContext, jobId, function, false);
+    }
+
     /**
      * The reason of using ClusterClient instead of JobClient to retrieve a cluster is the JobClient
      * can't know whether the job is finished on yarn-per-job mode.
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
index e46bd70a4..0b1fc64a6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/deployment/YarnApplicationClusterDescriptorAdapter.java
@@ -20,7 +20,7 @@ package org.apache.linkis.engineconnplugin.flink.client.deployment;
 import org.apache.linkis.engineconnplugin.flink.client.context.ExecutionContext;
 import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.application.ApplicationConfiguration;
 import org.apache.flink.client.program.ClusterClientProvider;
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
index cf1ad8ce3..160a1f8e3 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/java/org/apache/linkis/engineconnplugin/flink/client/result/ChangelogResult.java
@@ -20,7 +20,7 @@ package org.apache.linkis.engineconnplugin.flink.client.result;
 import org.apache.linkis.engineconnplugin.flink.exception.SqlExecutionException;
 import org.apache.linkis.engineconnplugin.flink.listener.RowsType;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
index 2551cd31f..83b504356 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkEnvConfiguration.scala
@@ -24,6 +24,7 @@ import org.apache.linkis.engineconnplugin.flink.client.config.entries.ExecutionE
 
 object FlinkEnvConfiguration {
 
+  val SPARK_LIB_PATH = CommonVars("spark.lib.path", CommonVars("SPARK_HOME", "/appcom/Install/spark").getValue + "/jars")
   val FLINK_HOME_ENV = "FLINK_HOME"
   val FLINK_CONF_DIR_ENV = "FLINK_CONF_DIR"
   val FLINK_VERSION = CommonVars("flink.version", "1.12.2")
@@ -64,4 +65,14 @@ object FlinkEnvConfiguration {
   val FLINK_REPORTER_CLASS = CommonVars("linkis.flink.reporter.class", "")
   val FLINK_REPORTER_INTERVAL = CommonVars("linkis.flink.reporter.interval", new TimeType("60s"))
 
+  val FLINK_EXECUTION_ATTACHED = CommonVars("linkis.flink.execution.attached", true)
+  val FLINK_CONFIG_PREFIX = "_FLINK_CONFIG_."
+
+  val FLINK_KERBEROS_ENABLE = CommonVars("linkis.flink.kerberos.enable", false)
+  val FLINK_KERBEROS_LOGIN_CONTEXTS = CommonVars("linkis.flink.kerberos.login.contexts", "Client,KafkaClient")
+  val FLINK_KERBEROS_LOGIN_KEYTAB = CommonVars("linkis.flink.kerberos.login.keytab", "")
+  val FLINK_KERBEROS_LOGIN_PRINCIPAL = CommonVars("linkis.flink.kerberos.login.principal", "")
+  val FLINK_KERBEROS_CONF_PATH = CommonVars("linkis.flink.kerberos.krb5-conf.path", "")
+
+  val FLINK_PARAMS_BLANK_PLACEHOLER = CommonVars("linkis.flink.params.placeholder.blank", "\\0x001")
 }
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
index 31ac1663d..dc6b0692d 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/config/FlinkResourceConfiguration.scala
@@ -22,12 +22,12 @@ import org.apache.linkis.common.conf.CommonVars
 
 object FlinkResourceConfiguration {
 
-  val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 4) //Unit: G(单位为G)
+  val LINKIS_FLINK_CLIENT_MEMORY = CommonVars[Int]("flink.client.memory", 1024) //Unit: M(单位为M)
   val LINKIS_FLINK_CLIENT_CORES = 1 //Fixed to 1(固定为1) CommonVars[Int]("wds.linkis.driver.cores", 1)
 
 
-  val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 2) //Unit: G(单位为G)
-  val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4) //Unit: G(单位为G)
+  val LINKIS_FLINK_JOB_MANAGER_MEMORY = CommonVars[Int]("flink.jobmanager.memory", 1024) //Unit: M(单位为M)
+  val LINKIS_FLINK_TASK_MANAGER_MEMORY = CommonVars[Int]("flink.taskmanager.memory", 4096) //Unit: M(单位为M)
   val LINKIS_FLINK_TASK_SLOTS = CommonVars[Int]("flink.taskmanager.numberOfTaskSlots", 2)
   val LINKIS_FLINK_TASK_MANAGER_CPU_CORES = CommonVars[Int]("flink.taskmanager.cpu.cores", 2)
   val LINKIS_FLINK_CONTAINERS = CommonVars[Int]("flink.container.num", 2)
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
index 119bdd44c..bfff2f8f6 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/context/EnvironmentContext.scala
@@ -24,7 +24,7 @@ import java.util.Objects
 import com.google.common.collect.Lists
 import org.apache.linkis.engineconnplugin.flink.client.config.Environment
 import org.apache.linkis.engineconnplugin.flink.client.factory.LinkisYarnClusterClientFactory
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.configuration.{Configuration, DeploymentOptionsInternal, GlobalConfiguration}
 import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
index f0c6d8f32..0875a2c89 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkCodeOnceExecutor.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.Future
 import java.util.function.Supplier
 
 import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.ClusterClientJobClientAdapter
 import org.apache.flink.client.program.{ClusterClient, ClusterClientProvider}
 import org.apache.flink.table.api.{ResultKind, TableResult}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
index 20e353959..7cbc9e1f1 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkJarOnceExecutor.scala
@@ -22,7 +22,7 @@ import org.apache.linkis.engineconn.once.executor.OnceExecutorExecutionContext
 import org.apache.linkis.engineconnplugin.flink.client.deployment.YarnApplicationClusterDescriptorAdapter
 import org.apache.linkis.engineconnplugin.flink.config.FlinkEnvConfiguration._
 import org.apache.linkis.engineconnplugin.flink.context.FlinkEngineConnContext
-import org.apache.commons.lang.StringUtils
+import org.apache.commons.lang3.StringUtils
 
 import scala.concurrent.duration.Duration
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
index 152aeb735..57d32eb10 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/executor/FlinkOnceExecutor.scala
@@ -47,8 +47,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
     }.toMap
     doSubmit(onceExecutorExecutionContext, options)
     if(isCompleted) return
-    if (null == clusterDescriptor.getClusterID)
+    if (null == clusterDescriptor.getClusterID) {
       throw new ExecutorInitException("The application start failed, since yarn applicationId is null.")
+    }
     setApplicationId(clusterDescriptor.getClusterID.toString)
     setApplicationURL(clusterDescriptor.getWebInterfaceUrl)
     info(s"Application is started, applicationId: $getApplicationId, applicationURL: $getApplicationURL.")
@@ -61,7 +62,9 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
 
   val id: Long
 
-  override def getId: String = "FlinkOnceApp_"+ id
+  def getClusterDescriptorAdapter: T = clusterDescriptor
+
+  override def getId: String = "FlinkOnceApp_" + id
 
   protected def closeDaemon(): Unit = {
     if (daemonThread != null) daemonThread.cancel(true)
@@ -78,14 +81,14 @@ trait FlinkOnceExecutor[T <: ClusterDescriptorAdapter] extends ManageableOnceExe
   }
 
   override protected def waitToRunning(): Unit = {
-    if(!isCompleted) daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
+    if (!isCompleted) daemonThread = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
       private var lastStatus: JobStatus = JobStatus.INITIALIZING
-      private var lastPrintTime = 0l
+      private var lastPrintTime = 0L
       private val printInterval = math.max(FLINK_ONCE_APP_STATUS_FETCH_INTERVAL.getValue.toLong, 5 * 60 * 1000)
       private var fetchJobStatusFailedNum = 0
-      override def run(): Unit = if(!isCompleted) {
-        val jobStatus = Utils.tryCatch(clusterDescriptor.getJobStatus){t =>
-          if(fetchJobStatusFailedNum >= FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue) {
+      override def run(): Unit = if (!isCompleted) {
+        val jobStatus = Utils.tryCatch(clusterDescriptor.getJobStatus) {t =>
+          if (fetchJobStatusFailedNum >= FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue) {
             error(s"Fetch job status has failed max ${FLINK_ONCE_APP_STATUS_FETCH_FAILED_MAX.getValue} times, now stop this FlinkEngineConn.", t)
             tryFailed()
             close()
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
index d357eeb09..485b9ed10 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/factory/FlinkEngineConnFactory.scala
@@ -23,6 +23,12 @@ import java.util
 import java.util.Collections
 
 import com.google.common.collect.Lists
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.configuration._
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
+import org.apache.flink.streaming.api.CheckpointingMode
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
 import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineconn.common.creation.EngineCreationContext
 import org.apache.linkis.engineconnplugin.flink.client.config.Environment
@@ -39,19 +45,21 @@ import org.apache.linkis.manager.engineplugin.common.creation.{ExecutorFactory,
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
 import org.apache.linkis.manager.label.entity.engine._
-import org.apache.commons.lang.StringUtils
-import org.apache.flink.configuration._
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings
-import org.apache.flink.streaming.api.CheckpointingMode
-import org.apache.flink.yarn.configuration.{YarnConfigOptions, YarnDeploymentTarget}
-
-import scala.collection.convert.decorateAsScala._
+import scala.collection.JavaConverters._
 
 
 class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging {
 
   override protected def createEngineConnSession(engineCreationContext: EngineCreationContext): Any = {
-    val options = engineCreationContext.getOptions
+    var options = engineCreationContext.getOptions
+    // Filter the options (startUpParams)
+    options = options.asScala.mapValues{
+      case value if value.contains(FLINK_PARAMS_BLANK_PLACEHOLER.getValue) =>
+        info(s"Transform option value: [$value]")
+        value.replace(FLINK_PARAMS_BLANK_PLACEHOLER.getValue, " ")
+      case v1 => v1
+    }.toMap.asJava
+    engineCreationContext.setOptions(options)
     val environmentContext = createEnvironmentContext(engineCreationContext)
     val flinkEngineConnContext = createFlinkEngineConnContext(environmentContext)
     val executionContext = createExecutionContext(options, environmentContext)
@@ -68,56 +76,75 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
     val flinkLibRemotePath = FLINK_LIB_REMOTE_PATH.getValue(options)
     val flinkDistJarPath = FLINK_DIST_JAR_PATH.getValue(options)
     val providedLibDirsArray = FLINK_LIB_LOCAL_PATH.getValue(options).split(",")
-    val shipDirsArray = FLINK_SHIP_DIRECTORIES.getValue(options).split(",")
+    var shipDirsArray = FLINK_SHIP_DIRECTORIES.getValue(options).split(",")
+    shipDirsArray = shipDirsArray match {
+      case pathArray: Array[String] =>
+        pathArray.map(dir => {
+          if (new File(dir).exists()) dir
+          else getClass.getClassLoader.getResource(dir).getPath
+        })
+      case _ => new Array[String](0)
+    }
     val context = new EnvironmentContext(defaultEnv, new Configuration, hadoopConfDir, flinkConfDir, flinkHome,
       flinkDistJarPath, flinkLibRemotePath, providedLibDirsArray, shipDirsArray, null)
-    //Step1: environment-level configurations(第一步: 环境级别配置)
+    // Step1: environment-level configurations
     val jobName = options.getOrDefault("flink.app.name", "EngineConn-Flink")
     val yarnQueue = LINKIS_QUEUE_NAME.getValue(options)
     val parallelism = FLINK_APP_DEFAULT_PARALLELISM.getValue(options)
-    val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "G"
-    val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "G"
+    val jobManagerMemory = LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(options) + "M"
+    val taskManagerMemory = LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(options) + "M"
     val numberOfTaskSlots = LINKIS_FLINK_TASK_SLOTS.getValue(options)
-    info(s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory G, taskManagerMemory = $taskManagerMemory G, numberOfTaskSlots = $numberOfTaskSlots.")
-    //Step2: application-level configurations(第二步: 应用级别配置)
-    //construct app-config(构建应用配置)
+    info(s"Use yarn queue $yarnQueue, and set parallelism = $parallelism, jobManagerMemory = $jobManagerMemory, taskManagerMemory = $taskManagerMemory, numberOfTaskSlots = $numberOfTaskSlots.")
+    // Step2: application-level configurations
+    // construct app-config
     val flinkConfig = context.getFlinkConfig
-    //construct jar-dependencies(构建依赖jar包环境)
+    // construct jar-dependencies
     val flinkUserLibRemotePath = FLINK_USER_LIB_REMOTE_PATH.getValue(options).split(",")
     val providedLibDirList = Lists.newArrayList(flinkUserLibRemotePath.filter(StringUtils.isNotBlank): _*)
     val flinkUserRemotePathList = Lists.newArrayList(flinkLibRemotePath.split(",").filter(StringUtils.isNotBlank): _*)
     if (flinkUserRemotePathList != null && flinkUserRemotePathList.size() > 0) providedLibDirList.addAll(flinkUserRemotePathList)
-    //if(StringUtils.isNotBlank(flinkLibRemotePath)) providedLibDirList.add(flinkLibRemotePath)
+    // if(StringUtils.isNotBlank(flinkLibRemotePath)) providedLibDirList.add(flinkLibRemotePath)
     flinkConfig.set(YarnConfigOptions.PROVIDED_LIB_DIRS, providedLibDirList)
-    //construct jar-dependencies(构建依赖jar包环境)
-    flinkConfig.set(YarnConfigOptions.SHIP_ARCHIVES, context.getShipDirs)
+    // construct jar-dependencies
+    flinkConfig.set(YarnConfigOptions.SHIP_FILES, context.getShipDirs)
     // set user classpaths
     val classpaths = FLINK_APPLICATION_CLASSPATH.getValue(options)
     if (StringUtils.isNotBlank(classpaths)) {
       info(s"Add $classpaths to flink application classpath.")
       flinkConfig.set(PipelineOptions.CLASSPATHS, util.Arrays.asList(classpaths.split(","): _*))
     }
-    //yarn application name(yarn 作业名称)
+    // yarn application name
     flinkConfig.set(YarnConfigOptions.APPLICATION_NAME, jobName)
-    //yarn queue
+    // yarn queue
     flinkConfig.set(YarnConfigOptions.APPLICATION_QUEUE, yarnQueue)
-    //Configure resource/concurrency (设置:资源/并发度)
+    // Configure resource/concurrency
     flinkConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism)
     flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(jobManagerMemory))
     flinkConfig.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse(taskManagerMemory))
     flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numberOfTaskSlots)
-    if(FLINK_REPORTER_ENABLE.getValue) {
-      flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue)
-      flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue.toLong))
+    // set extra configs
+    options.asScala.filter{ case(key, _) => key.startsWith(FLINK_CONFIG_PREFIX)}.foreach {
+      case (key, value) => flinkConfig.setString(key.substring(FLINK_CONFIG_PREFIX.length), value)
+    }
+    // set kerberos config
+    if(FLINK_KERBEROS_ENABLE.getValue(options)) {
+      flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_CONTEXTS, FLINK_KERBEROS_LOGIN_CONTEXTS.getValue(options))
+      flinkConfig.set(SecurityOptions.KERBEROS_KRB5_PATH, FLINK_KERBEROS_CONF_PATH.getValue(options))
+      flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, FLINK_KERBEROS_LOGIN_PRINCIPAL.getValue(options))
+      flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_KEYTAB, FLINK_KERBEROS_LOGIN_KEYTAB.getValue(options))
     }
-    //set savePoint(设置 savePoint)
+    if(FLINK_REPORTER_ENABLE.getValue(options)) {
+      flinkConfig.set(MetricOptions.REPORTER_CLASS, FLINK_REPORTER_CLASS.getValue(options))
+      flinkConfig.set(MetricOptions.REPORTER_INTERVAL, Duration.ofMillis(FLINK_REPORTER_INTERVAL.getValue(options).toLong))
+    }
+    // set savePoint
     val savePointPath = FLINK_SAVE_POINT_PATH.getValue(options)
     if (StringUtils.isNotBlank(savePointPath)) {
       val allowNonRestoredState = FLINK_APP_ALLOW_NON_RESTORED_STATUS.getValue(options).toBoolean
       val savepointRestoreSettings = SavepointRestoreSettings.forPath(savePointPath, allowNonRestoredState)
       SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, flinkConfig)
     }
-    //Configure user-entrance jar. Can be remote, but only support 1 jar(设置:用户入口jar:可以远程,只能设置1个jar)
+    // Configure user-entrance jar. Can be HDFS file, but only support 1 jar
     val flinkMainClassJar = FLINK_APPLICATION_MAIN_CLASS_JAR.getValue(options)
     if(StringUtils.isNotBlank(flinkMainClassJar)) {
       val flinkMainClassJarPath = if (new File(flinkMainClassJar).exists()) flinkMainClassJar
@@ -125,9 +152,10 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
       info(s"Ready to use $flinkMainClassJarPath as main class jar to submit application to Yarn.")
       flinkConfig.set(PipelineOptions.JARS, Collections.singletonList(flinkMainClassJarPath))
       flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName)
+      flinkConfig.setBoolean(DeploymentOptions.ATTACHED, FLINK_EXECUTION_ATTACHED.getValue(options))
       context.setDeploymentTarget(YarnDeploymentTarget.APPLICATION)
       addApplicationLabels(engineCreationContext)
-    } else if(isOnceEngineConn(engineCreationContext.getLabels())) {
+    } else if (isOnceEngineConn(engineCreationContext.getLabels())) {
       flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
     } else {
       flinkConfig.set(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
@@ -161,11 +189,13 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
     val environment = environmentContext.getDeploymentTarget match {
       case YarnDeploymentTarget.PER_JOB | YarnDeploymentTarget.SESSION =>
         val planner = FlinkEnvConfiguration.FLINK_SQL_PLANNER.getValue(options)
-        if (!ExecutionEntry.AVAILABLE_PLANNERS.contains(planner.toLowerCase))
+        if (!ExecutionEntry.AVAILABLE_PLANNERS.contains(planner.toLowerCase)) {
           throw new FlinkInitFailedException("Planner must be one of these: " + String.join(", ", ExecutionEntry.AVAILABLE_PLANNERS))
+        }
         val executionType = FlinkEnvConfiguration.FLINK_SQL_EXECUTION_TYPE.getValue(options)
-        if (!ExecutionEntry.AVAILABLE_EXECUTION_TYPES.contains(executionType.toLowerCase))
+        if (!ExecutionEntry.AVAILABLE_EXECUTION_TYPES.contains(executionType.toLowerCase)) {
           throw new FlinkInitFailedException("Execution type must be one of these: " + String.join(", ", ExecutionEntry.AVAILABLE_EXECUTION_TYPES))
+        }
         val properties = new util.HashMap[String, String]
         properties.put(Environment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_PLANNER, planner)
         properties.put(Environment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_TYPE, executionType)
@@ -202,6 +232,8 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
       }
       checkpointConfig.setCheckpointTimeout(checkpointTimeout)
       checkpointConfig.setMinPauseBetweenCheckpoints(checkpointMinPause)
+      checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
+      checkpointConfig.configure(environmentContext.getFlinkConfig)
     }
     executionContext
   }
@@ -213,7 +245,7 @@ class FlinkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
 
   override protected def getEngineConnType: EngineType = EngineType.FLINK
 
-  private val executorFactoryArray =  Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory),
+  private val executorFactoryArray = Array[ExecutorFactory](ClassUtil.getInstance(classOf[FlinkSQLExecutorFactory], new FlinkSQLExecutorFactory),
     ClassUtil.getInstance(classOf[FlinkApplicationExecutorFactory], new FlinkApplicationExecutorFactory),
     ClassUtil.getInstance(classOf[FlinkCodeExecutorFactory], new FlinkCodeExecutorFactory))
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
index 5f4e44862..cf8e95f57 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/launch/FlinkEngineConnLaunchBuilder.scala
@@ -36,7 +36,7 @@ class FlinkEngineConnLaunchBuilder extends JavaProcessEngineConnLaunchBuilder {
 
   override protected def getCommands(implicit engineConnBuildRequest: EngineConnBuildRequest): Array[String] = {
     val properties = engineConnBuildRequest.engineConnCreationDesc.properties
-    properties.put(EnvConfiguration.ENGINE_CONN_MEMORY.key, FlinkResourceConfiguration.LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "G")
+    properties.put(EnvConfiguration.ENGINE_CONN_MEMORY.key, FlinkResourceConfiguration.LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "M")
     super.getCommands
   }
 
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
new file mode 100644
index 000000000..65096ab80
--- /dev/null
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/operator/TriggerSavepointOperator.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineconnplugin.flink.operator
+
+import org.apache.linkis.common.utils.Logging
+import org.apache.linkis.engineconn.once.executor.creation.OnceExecutorManager
+import org.apache.linkis.engineconnplugin.flink.client.deployment.ClusterDescriptorAdapter
+import org.apache.linkis.engineconnplugin.flink.exception.JobExecutionException
+import org.apache.linkis.engineconnplugin.flink.executor.FlinkOnceExecutor
+import org.apache.linkis.manager.common.operator.Operator
+
+
+class TriggerSavepointOperator extends Operator with Logging {
+
+  override def getNames: Array[String] = Array("doSavepoint")
+
+  override def apply(implicit parameters: Map[String, Any]): Map[String, Any] = {
+    val savepoint = getAsThrow("savepointPath")
+    val mode = getAsThrow("mode")
+    info(s"try to $mode savepoint with path $savepoint.")
+    OnceExecutorManager.getInstance.getReportExecutor match {
+      case flinkExecutor: FlinkOnceExecutor[_] =>
+        val writtenSavepoint = flinkExecutor.getClusterDescriptorAdapter.doSavepoint(savepoint, mode)
+        Map("writtenSavepoint" -> writtenSavepoint)
+      case executor => throw new JobExecutionException("Not support to do savepoint for " + executor.getClass.getSimpleName)
+    }
+  }
+}
diff --git a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
index 38bf3ac92..129c6d21d 100644
--- a/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
+++ b/linkis-engineconn-plugins/engineconn-plugins/flink/src/main/scala/org/apache/linkis/engineconnplugin/flink/resource/FlinkEngineConnResourceFactory.scala
@@ -31,11 +31,11 @@ class FlinkEngineConnResourceFactory extends AbstractEngineResourceFactory {
       properties.put(FLINK_APP_DEFAULT_PARALLELISM.key, String.valueOf(containers * LINKIS_FLINK_TASK_SLOTS.getValue(properties)))
       containers
     } else math.round(FLINK_APP_DEFAULT_PARALLELISM.getValue(properties) * 1.0f / LINKIS_FLINK_TASK_SLOTS.getValue(properties))
-    val yarnMemory = ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "G") +
-      ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(properties) + "G")
+    val yarnMemory = ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_TASK_MANAGER_MEMORY.getValue(properties) * containers + "M") +
+      ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_JOB_MANAGER_MEMORY.getValue(properties) + "M")
     val yarnCores = LINKIS_FLINK_TASK_MANAGER_CPU_CORES.getValue(properties) * containers + 1
     new DriverAndYarnResource(
-      new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "G"),
+      new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(LINKIS_FLINK_CLIENT_MEMORY.getValue(properties) + "M"),
         LINKIS_FLINK_CLIENT_CORES,
         1),
       new YarnResource(yarnMemory, yarnCores, 0, LINKIS_QUEUE_NAME.getValue(properties))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org