You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zi...@apache.org on 2022/05/09 10:52:02 UTC

[dolphinscheduler] branch dev updated: [Feature- 9837][plugin/ui] support FlinkSQL Task (#9840)

This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2d36449444 [Feature- 9837][plugin/ui] support FlinkSQL Task (#9840)
2d36449444 is described below

commit 2d36449444567ef86f0ae8260ffdde338aab189c
Author: Dannila <94...@users.noreply.github.com>
AuthorDate: Mon May 9 18:51:55 2022 +0800

    [Feature- 9837][plugin/ui] support FlinkSQL Task (#9840)
    
    * flink_sql
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * '[refactor]flinksql'
    
    * [refactor] flinksql
    
    * '[refactor]flinksql'
    
    * [refactor] dolphinscheduler flinksql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * [refactor] flink sql
    
    * Update docs/docs/en/guide/task/flink.md
    
    * Update docs/docs/zh/guide/task/flink.md
    
    * prettier front-end code
    
    Co-authored-by: Jiajie Zhong <zh...@gmail.com>
    Co-authored-by: devosend <de...@gmail.com>
---
 docs/docs/en/guide/task/flink.md                   |  20 +-
 docs/docs/zh/guide/task/flink.md                   |  20 +-
 docs/img/tasks/demo/flink_sql_test.png             | Bin 0 -> 996404 bytes
 .../plugin/task/flink/FlinkArgsUtils.java          | 136 ---------
 .../plugin/task/flink/FlinkConstants.java          |  34 ++-
 .../plugin/task/flink/FlinkParameters.java         |  36 ++-
 .../plugin/task/flink/FlinkTask.java               | 325 +++++++++++++++++++--
 .../plugin/task/flink/ProgramType.java             |   5 +-
 .../plugin/task/flink/FlinkTaskTest.java           | 118 ++++++++
 dolphinscheduler-ui/src/locales/modules/en_US.ts   |   2 +
 dolphinscheduler-ui/src/locales/modules/zh_CN.ts   |   2 +
 .../task/components/node/fields/use-flink.ts       |  55 +++-
 .../task/components/node/fields/use-main-jar.ts    |   6 +-
 .../projects/task/components/node/format-data.ts   |   5 +
 .../task/components/node/tasks/use-flink.ts        |   2 +
 .../views/projects/task/components/node/types.ts   |   1 +
 16 files changed, 580 insertions(+), 187 deletions(-)

diff --git a/docs/docs/en/guide/task/flink.md b/docs/docs/en/guide/task/flink.md
index 4307002ed1..f0d081bdec 100644
--- a/docs/docs/en/guide/task/flink.md
+++ b/docs/docs/en/guide/task/flink.md
@@ -2,7 +2,11 @@
 
 ## Overview
 
-Flink task type for executing Flink programs. For Flink nodes, the worker submits the task by using the Flink command `flink run`. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details.
+Flink task type, used to execute Flink programs. For Flink nodes:
+
+1. When the program type is Java, Scala or Python, the worker submits the task `flink run` using the Flink command. See [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) for more details.
+
+2. When the program type is SQL, the worker submit tasks using `sql-client.sh`. See [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) for more details.
 
 ## Create Task
 
@@ -21,10 +25,12 @@ Flink task type for executing Flink programs. For Flink nodes, the worker submit
 - **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task.
 - **Delayed execution time**: The time (unit minute) that a task delays in execution.
 - **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm email will send and the task execution will fail.
-- **Program type**: Supports Java, Scala and Python.
+- **Program type**: Support Java, Scala, Python and SQL four languages.
 - **The class of main function**: The **full path** of Main Class, the entry point of the Flink program.
 - **Main jar package**: The jar package of the Flink program (upload by Resource Center).
 - **Deployment mode**: Support 2 deployment modes: cluster and local.
+- **Initialization script**: Script file to initialize session context.
+- **Script**: The sql script file developed by the user that should be executed.
 - **Flink version**: Select version according to the execution env.
 - **Task name** (optional): Flink task name.
 - **JobManager memory size**: Used to set the size of jobManager memories, which can be set according to the actual production environment.
@@ -64,6 +70,14 @@ Configure the required content according to the parameter descriptions above.
 
 ![demo-flink-simple](/img/tasks/demo/flink_task02.png)
 
+### Execute the FlinkSQL Program
+
+Configure the required content according to the parameter descriptions above.
+
+![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png)
+
 ## Notice
 
-JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same.
+- JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same.
+
+- Use SQL to execute Flink SQL tasks, currently only Flink 1.13 and above are supported.
diff --git a/docs/docs/zh/guide/task/flink.md b/docs/docs/zh/guide/task/flink.md
index e4a0dac084..84acd60422 100644
--- a/docs/docs/zh/guide/task/flink.md
+++ b/docs/docs/zh/guide/task/flink.md
@@ -2,7 +2,11 @@
 
 ## 综述
 
-Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker 会通过使用 flink 命令 `flink run` 的方式提交任务。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/)。
+Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点:
+
+1. 当程序类型为 Java、Scala 或 Python 时,worker 使用 Flink 命令提交任务 `flink run`。更多详情查看 [flink cli](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/) 。
+
+2. 当程序类型为 SQL 时,worker 使用`sql-client.sh` 提交任务。更多详情查看 [flink sql client](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/) 。
 
 ## 创建任务
 
@@ -21,10 +25,12 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker 
 - 失败重试间隔:任务失败重新提交任务的时间间隔,以分钟为单位。
 - 延迟执行时间:任务延迟执行的时间,以分钟为单位。
 - 超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
-- 程序类型:支持 Java、Scala 和 Python 三种语言。
+- 程序类型:支持 Java、Scala、 Python 和 SQL 四种语言。
 - 主函数的 Class:Flink 程序的入口 Main Class 的**全路径**。
 - 主程序包:执行 Flink 程序的 jar 包(通过资源中心上传)。
 - 部署方式:支持 cluster 和 local 两种模式的部署。
+- 初始化脚本:用于初始化会话上下文的脚本文件。
+- 脚本:用户开发的应该执行的 SQL 脚本文件。
 - Flink 版本:根据所需环境选择对应的版本即可。
 - 任务名称(选填):Flink 程序的名称。
 - jobManager 内存数:用于设置 jobManager 内存数,可根据实际生产环境设置对应的内存数。
@@ -64,6 +70,14 @@ Flink 任务类型,用于执行 Flink 程序。对于 Flink 节点,worker 
 
 ![demo-flink-simple](/img/tasks/demo/flink_task02.png)
 
+### 执行 FlinkSQL 程序
+
+根据上述参数说明,配置所需的内容即可。
+
+![demo-flink-sql-simple](/img/tasks/demo/flink_sql_test.png)
+
 ## 注意事项:
 
-Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。
+- Java 和 Scala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的 class,其余的都一样。
+
+- 使用 SQL 执行 Flink SQL 任务,目前只支持 Flink 1.13及以上版本。
diff --git a/docs/img/tasks/demo/flink_sql_test.png b/docs/img/tasks/demo/flink_sql_test.png
new file mode 100644
index 0000000000..4cba8c7cf2
Binary files /dev/null and b/docs/img/tasks/demo/flink_sql_test.png differ
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
deleted file mode 100644
index ea047bea6c..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.flink;
-
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * flink args utils
- */
-public class FlinkArgsUtils {
-
-    private FlinkArgsUtils() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    private static final String LOCAL_DEPLOY_MODE = "local";
-    private static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
-
-    /**
-     * build args
-     *
-     * @param param flink parameters
-     * @return argument list
-     */
-    public static List<String> buildArgs(FlinkParameters param) {
-        List<String> args = new ArrayList<>();
-
-        String deployMode = "cluster";
-        String tmpDeployMode = param.getDeployMode();
-        if (StringUtils.isNotEmpty(tmpDeployMode)) {
-            deployMode = tmpDeployMode;
-        }
-        String others = param.getOthers();
-        if (!LOCAL_DEPLOY_MODE.equals(deployMode)) {
-            args.add(FlinkConstants.FLINK_RUN_MODE);  //-m
-
-            args.add(FlinkConstants.FLINK_YARN_CLUSTER);   //yarn-cluster
-
-            int slot = param.getSlot();
-            if (slot > 0) {
-                args.add(FlinkConstants.FLINK_YARN_SLOT);
-                args.add(String.format("%d", slot));   //-ys
-            }
-
-            String appName = param.getAppName();
-            if (StringUtils.isNotEmpty(appName)) { //-ynm
-                args.add(FlinkConstants.FLINK_APP_NAME);
-                args.add(ArgsUtils.escape(appName));
-            }
-
-            // judge flink version, the parameter -yn has removed from flink 1.10
-            String flinkVersion = param.getFlinkVersion();
-            if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
-                int taskManager = param.getTaskManager();
-                if (taskManager > 0) {                        //-yn
-                    args.add(FlinkConstants.FLINK_TASK_MANAGE);
-                    args.add(String.format("%d", taskManager));
-                }
-            }
-            String jobManagerMemory = param.getJobManagerMemory();
-            if (StringUtils.isNotEmpty(jobManagerMemory)) {
-                args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
-                args.add(jobManagerMemory); //-yjm
-            }
-
-            String taskManagerMemory = param.getTaskManagerMemory();
-            if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
-                args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
-                args.add(taskManagerMemory);
-            }
-
-            if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
-                String queue = param.getQueue();
-                if (StringUtils.isNotEmpty(queue)) { // -yqu
-                    args.add(FlinkConstants.FLINK_QUEUE);
-                    args.add(queue);
-                }
-            }
-        }
-
-        int parallelism = param.getParallelism();
-        if (parallelism > 0) {
-            args.add(FlinkConstants.FLINK_PARALLELISM);
-            args.add(String.format("%d", parallelism));   // -p
-        }
-
-        // If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly
-        // The task status will be synchronized with the cluster job status
-        args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
-
-        // -s -yqu -yat -yD -D
-        if (StringUtils.isNotEmpty(others)) {
-            args.add(others);
-        }
-
-        ProgramType programType = param.getProgramType();
-        String mainClass = param.getMainClass();
-        if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
-            args.add(FlinkConstants.FLINK_MAIN_CLASS);    //-c
-            args.add(param.getMainClass());          //main class
-        }
-
-        ResourceInfo mainJar = param.getMainJar();
-        if (mainJar != null) {
-            args.add(mainJar.getRes());
-        }
-
-        String mainArgs = param.getMainArgs();
-        if (StringUtils.isNotEmpty(mainArgs)) {
-            args.add(mainArgs);
-        }
-
-        return args;
-    }
-
-}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
index 7a3c1c6fc9..2e55de9b25 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkConstants.java
@@ -24,7 +24,20 @@ public class FlinkConstants {
     }
 
     /**
-     * flink
+     * flink command
+     * usage: flink run [OPTIONS] <jar-file> <arguments>
+     */
+    public static final String FLINK_COMMAND = "flink";
+    public static final String FLINK_RUN = "run";
+
+    /**
+     * flink sql command
+     * usage: sql-client.sh -i <initialization file>, -f <script file>
+     */
+    public static final String FLINK_SQL_COMMAND = "sql-client.sh";
+
+    /**
+     * flink run options
      */
     public static final String FLINK_YARN_CLUSTER = "yarn-cluster";
     public static final String FLINK_RUN_MODE = "-m";
@@ -32,11 +45,28 @@ public class FlinkConstants {
     public static final String FLINK_APP_NAME = "-ynm";
     public static final String FLINK_QUEUE = "-yqu";
     public static final String FLINK_TASK_MANAGE = "-yn";
-
     public static final String FLINK_JOB_MANAGE_MEM = "-yjm";
     public static final String FLINK_TASK_MANAGE_MEM = "-ytm";
     public static final String FLINK_MAIN_CLASS = "-c";
     public static final String FLINK_PARALLELISM = "-p";
     public static final String FLINK_SHUTDOWN_ON_ATTACHED_EXIT = "-sae";
 
+    public static final String FLINK_FORMAT_EXECUTION_TARGET = "set execution.target=%s";
+    public static final String FLINK_FORMAT_YARN_APPLICATION_NAME = "set yarn.application.name=%s";
+    public static final String FLINK_FORMAT_YARN_APPLICATION_QUEUE = "set yarn.application.queue=%s";
+    public static final String FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE = "set jobmanager.memory.process.size=%s";
+    public static final String FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE = "set taskmanager.memory.process.size=%s";
+    public static final String FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS = "set taskmanager.numberOfTaskSlots=%d";
+    public static final String FLINK_FORMAT_PARALLELISM_DEFAULT = "set parallelism.default=%d";
+    public static final String FLINK_SQL_SCRIPT_FILE = "-f";
+    public static final String FLINK_SQL_INIT_FILE = "-i";
+    public static final String FLINK_SQL_NEWLINE = ";\n";
+
+    // execution.target options
+    public static final String EXECUTION_TARGET_YARN_PER_JOB = "yarn-per-job";
+    public static final String EXECUTION_TARGET_LOACL = "local";
+
+    public static final String DEPLOY_MODE_CLUSTER = "cluster";
+    public static final String DEPLOY_MODE_LOCAL = "local";
+    public static final String FLINK_VERSION_BEFORE_1_10 = "<1.10";
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
index c458c24f90..dca6fb58a3 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkParameters.java
@@ -100,10 +100,20 @@ public class FlinkParameters extends AbstractParameters {
 
     /**
      * program type
-     * 0 JAVA,1 SCALA,2 PYTHON
+     * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
      */
     private ProgramType programType;
 
+    /**
+     * flink sql initialization file
+     */
+    private String initScript;
+
+    /**
+     * flink sql script file
+     */
+    private String rawScript;
+
     public ResourceInfo getMainJar() {
         return mainJar;
     }
@@ -224,9 +234,30 @@ public class FlinkParameters extends AbstractParameters {
         this.flinkVersion = flinkVersion;
     }
 
+    public String getInitScript() {
+        return initScript;
+    }
+
+    public void setInitScript(String initScript) {
+        this.initScript = initScript;
+    }
+
+    public String getRawScript() {
+        return rawScript;
+    }
+
+    public void setRawScript(String rawScript) {
+        this.rawScript = rawScript;
+    }
+
     @Override
     public boolean checkParameters() {
-        return mainJar != null && programType != null;
+        /**
+         * When saving a task, the parameter cannot be empty. There are two judgments:
+         * (1) When ProgramType is SQL, rawScript cannot be empty.
+         * (2) When ProgramType is Java/Scala/Python, mainJar cannot be empty.
+         */
+        return programType != null && (rawScript != null || mainJar != null);
     }
 
     @Override
@@ -236,5 +267,4 @@ public class FlinkParameters extends AbstractParameters {
         }
         return resourceList;
     }
-
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index 5e1734b7bf..966e8a01bd 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.plugin.task.flink;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -24,26 +26,30 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class FlinkTask extends AbstractYarnTask {
 
     /**
-     *  flink command
-     *  usage: flink run [OPTIONS] <jar-file> <arguments>
-     */
-    private static final String FLINK_COMMAND = "flink";
-    private static final String FLINK_RUN = "run";
-
-    /**
-     *  flink parameters
+     * flink parameters
      */
     private FlinkParameters flinkParameters;
 
@@ -68,53 +74,310 @@ public class FlinkTask extends AbstractYarnTask {
             throw new RuntimeException("flink task params is not valid");
         }
         flinkParameters.setQueue(taskExecutionContext.getQueue());
-        setMainJarName();
 
-        if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
-            String args = flinkParameters.getMainArgs();
+        if (ProgramType.SQL != flinkParameters.getProgramType()) {
+            setMainJarName();
+        }
+    }
+
+    /**
+     * create command
+     *
+     * @return command
+     */
+    @Override
+    protected String buildCommand() {
+        List<String> args = new ArrayList<>();
+
+        if (ProgramType.SQL != flinkParameters.getProgramType()) {
+            // execute flink run [OPTIONS] <jar-file> <arguments>
+            args.add(FlinkConstants.FLINK_COMMAND);
+            args.add(FlinkConstants.FLINK_RUN);
+            args.addAll(populateFlinkOptions());
+        } else {
+            // execute sql-client.sh -f <script file>
+            args.add(FlinkConstants.FLINK_SQL_COMMAND);
+            args.addAll(populateFlinkSqlOptions());
+        }
+        String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
+        logger.info("flink task command : {}", command);
+        return command;
+    }
+
+    /**
+     * build flink options
+     *
+     * @return argument list
+     */
+    private List<String> populateFlinkOptions() {
+        List<String> args = new ArrayList<>();
+
+        String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
+
+        if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
+            populateFlinkOnYarnOptions(args);
+        }
+
+        // -p
+        int parallelism = flinkParameters.getParallelism();
+        if (parallelism > 0) {
+            args.add(FlinkConstants.FLINK_PARALLELISM);
+            args.add(String.format("%d", parallelism));
+        }
+
+        /**
+         * -sae
+         *
+         * If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly.
+         * The task status will be synchronized with the cluster job status.
+         */
+        args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT);
 
+        // -s -yqu -yat -yD -D
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+
+        // -c
+        ProgramType programType = flinkParameters.getProgramType();
+        String mainClass = flinkParameters.getMainClass();
+        if (programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
+            args.add(FlinkConstants.FLINK_MAIN_CLASS);
+            args.add(flinkParameters.getMainClass());
+        }
+
+        ResourceInfo mainJar = flinkParameters.getMainJar();
+        if (mainJar != null) {
+            args.add(mainJar.getRes());
+        }
+
+        String mainArgs = flinkParameters.getMainArgs();
+        if (StringUtils.isNotEmpty(mainArgs)) {
             // combining local and global parameters
-            Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+            Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
             if (MapUtils.isEmpty(paramsMap)) {
                 paramsMap = new HashMap<>();
             }
             if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
                 paramsMap.putAll(taskExecutionContext.getParamsMap());
             }
+            args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
+        }
+
+        return args;
+    }
+
+    private void populateFlinkOnYarnOptions(List<String> args) {
+        // -m yarn-cluster
+        args.add(FlinkConstants.FLINK_RUN_MODE);
+        args.add(FlinkConstants.FLINK_YARN_CLUSTER);
+
+        // -ys
+        int slot = flinkParameters.getSlot();
+        if (slot > 0) {
+            args.add(FlinkConstants.FLINK_YARN_SLOT);
+            args.add(String.format("%d", slot));
+        }
+
+        // -ynm
+        String appName = flinkParameters.getAppName();
+        if (StringUtils.isNotEmpty(appName)) {
+            args.add(FlinkConstants.FLINK_APP_NAME);
+            args.add(ArgsUtils.escape(appName));
+        }
+
+        /**
+         * -yn
+         *
+         * Note: judge flink version, the parameter -yn has removed from flink 1.10
+         */
+        String flinkVersion = flinkParameters.getFlinkVersion();
+        if (flinkVersion == null || FlinkConstants.FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
+            int taskManager = flinkParameters.getTaskManager();
+            if (taskManager > 0) {
+                args.add(FlinkConstants.FLINK_TASK_MANAGE);
+                args.add(String.format("%d", taskManager));
+            }
+        }
+
+        // -yjm
+        String jobManagerMemory = flinkParameters.getJobManagerMemory();
+        if (StringUtils.isNotEmpty(jobManagerMemory)) {
+            args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
+            args.add(jobManagerMemory);
+        }
 
-            logger.info("param Map : {}", paramsMap);
-            args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
-            logger.info("param args : {}", args);
-            flinkParameters.setMainArgs(args);
+        // -ytm
+        String taskManagerMemory = flinkParameters.getTaskManagerMemory();
+        if (StringUtils.isNotEmpty(taskManagerMemory)) {
+            args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
+            args.add(taskManagerMemory);
+        }
+
+        // -yqu
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                args.add(FlinkConstants.FLINK_QUEUE);
+                args.add(queue);
+            }
         }
     }
 
     /**
-     * create command
-     * @return command
+     * build flink sql options
+     *
+     * @return argument list
      */
-    @Override
-    protected String buildCommand() {
-        // flink run [OPTIONS] <jar-file> <arguments>
+    private List<String> populateFlinkSqlOptions() {
         List<String> args = new ArrayList<>();
+        List<String> defalutOptions = new ArrayList<>();
 
-        args.add(FLINK_COMMAND);
-        args.add(FLINK_RUN);
-        logger.info("flink task args : {}", args);
-        // other parameters
-        args.addAll(FlinkArgsUtils.buildArgs(flinkParameters));
+        String deployMode = StringUtils.isNotEmpty(flinkParameters.getDeployMode()) ? flinkParameters.getDeployMode() : FlinkConstants.DEPLOY_MODE_CLUSTER;
 
-        String command = ParameterUtils
-                .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
+        /**
+         * Currently flink sql on yarn only supports yarn-per-job mode
+         */
+        if (!FlinkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
+            populateFlinkSqlOnYarnOptions(defalutOptions);
+        } else {
+            // execution.target
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_LOACL));
+        }
 
-        logger.info("flink task command : {}", command);
+        // parallelism.default
+        int parallelism = flinkParameters.getParallelism();
+        if (parallelism > 0) {
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_PARALLELISM_DEFAULT, parallelism));
+        }
 
-        return command;
+        // -i
+        args.add(FlinkConstants.FLINK_SQL_INIT_FILE);
+        args.add(generateInitScriptFile(StringUtils.join(defalutOptions, FlinkConstants.FLINK_SQL_NEWLINE).concat(FlinkConstants.FLINK_SQL_NEWLINE)));
+
+        // -f
+        args.add(FlinkConstants.FLINK_SQL_SCRIPT_FILE);
+        args.add(generateScriptFile());
+
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+        return args;
+    }
+
+    private void populateFlinkSqlOnYarnOptions(List<String> defalutOptions) {
+        // execution.target
+        defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_EXECUTION_TARGET, FlinkConstants.EXECUTION_TARGET_YARN_PER_JOB));
+
+        // taskmanager.numberOfTaskSlots
+        int slot = flinkParameters.getSlot();
+        if (slot > 0) {
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_NUMBEROFTASKSLOTS, slot));
+        }
+
+        // yarn.application.name
+        String appName = flinkParameters.getAppName();
+        if (StringUtils.isNotEmpty(appName)) {
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_NAME, ArgsUtils.escape(appName)));
+        }
+
+        // jobmanager.memory.process.size
+        String jobManagerMemory = flinkParameters.getJobManagerMemory();
+        if (StringUtils.isNotEmpty(jobManagerMemory)) {
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_JOBMANAGER_MEMORY_PROCESS_SIZE, jobManagerMemory));
+        }
+
+        // taskmanager.memory.process.size
+        String taskManagerMemory = flinkParameters.getTaskManagerMemory();
+        if (StringUtils.isNotEmpty(taskManagerMemory)) {
+            defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_TASKMANAGER_MEMORY_PROCESS_SIZE, taskManagerMemory));
+        }
+
+        // yarn.application.queue
+        String others = flinkParameters.getOthers();
+        if (StringUtils.isEmpty(others) || !others.contains(FlinkConstants.FLINK_QUEUE)) {
+            String queue = flinkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                defalutOptions.add(String.format(FlinkConstants.FLINK_FORMAT_YARN_APPLICATION_QUEUE, queue));
+            }
+        }
+    }
+
+    private String generateInitScriptFile(String parameters) {
+        String initScriptFileName = String.format("%s/%s_init.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+
+        File file = new File(initScriptFileName);
+        Path path = file.toPath();
+
+        if (!Files.exists(path)) {
+            Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
+            FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
+            try {
+                if (OSUtils.isWindows()) {
+                    Files.createFile(path);
+                } else {
+                    if (!file.getParentFile().exists()) {
+                        file.getParentFile().mkdirs();
+                    }
+                    Files.createFile(path, attr);
+                }
+
+                // Flink sql common parameters are written to the script file
+                logger.info("common parameters : {}", parameters);
+                Files.write(path, parameters.getBytes(), StandardOpenOption.APPEND);
+
+                // Flink init script is written to the script file
+                if (StringUtils.isNotEmpty(flinkParameters.getInitScript())) {
+                    String script = flinkParameters.getInitScript().replaceAll("\\r\\n", "\n");
+                    flinkParameters.setInitScript(script);
+                    logger.info("init script : {}", flinkParameters.getInitScript());
+                    Files.write(path, flinkParameters.getInitScript().getBytes(), StandardOpenOption.APPEND);
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("generate flink sql script error", e);
+            }
+        }
+        return initScriptFileName;
+    }
+
+    private String generateScriptFile() {
+        String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+
+        File file = new File(scriptFileName);
+        Path path = file.toPath();
+
+        if (!Files.exists(path)) {
+            String script = flinkParameters.getRawScript().replaceAll("\\r\\n", "\n");
+            flinkParameters.setRawScript(script);
+
+            logger.info("raw script : {}", flinkParameters.getRawScript());
+            logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
+
+            Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
+            FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
+            try {
+                if (OSUtils.isWindows()) {
+                    Files.createFile(path);
+                } else {
+                    if (!file.getParentFile().exists()) {
+                        file.getParentFile().mkdirs();
+                    }
+                    Files.createFile(path, attr);
+                }
+                // Flink sql raw script is written to the script file
+                Files.write(path, flinkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
+            } catch (IOException e) {
+                throw new RuntimeException("generate flink sql script error", e);
+            }
+        }
+        return scriptFileName;
     }
 
     @Override
     protected void setMainJarName() {
-        // main jar
         ResourceInfo mainJar = flinkParameters.getMainJar();
         String resourceName = getResourceNameOfMainJar(mainJar);
         mainJar.setRes(resourceName);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java
index a18b8e2b0b..563e34249b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/ProgramType.java
@@ -22,9 +22,10 @@ package org.apache.dolphinscheduler.plugin.task.flink;
  */
 public enum ProgramType {
     /**
-     * 0 JAVA,1 SCALA,2 PYTHON
+     * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
      */
     JAVA,
     SCALA,
-    PYTHON
+    PYTHON,
+    SQL
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
new file mode 100644
index 0000000000..df1c5ef70d
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/test/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTaskTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class
+})
+@PowerMockIgnore({"javax.*"})
+public class FlinkTaskTest {
+
+    @Test
+    public void testBuildCommand() {
+        String parameters = buildFlinkParameters();
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getQueue()).thenReturn("default");
+        FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
+        flinkTask.init();
+        Assert.assertEquals(
+            "flink run " +
+                "-m yarn-cluster " +
+                "-ys 1 " +
+                "-ynm TopSpeedWindowing " +
+                "-yjm 1G " +
+                "-ytm 1G " +
+                "-yqu default " +
+                "-p 2 -sae " +
+                "-c org.apache.flink.streaming.examples.windowing.TopSpeedWindowing " +
+                "TopSpeedWindowing.jar", flinkTask.buildCommand());
+    }
+
+    @Test
+    public void testBuildCommandWithFlinkSql() {
+        String parameters = buildFlinkParametersWithFlinkSql();
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        when(taskExecutionContext.getTaskAppId()).thenReturn("4483");
+        FlinkTask flinkTask = spy(new FlinkTask(taskExecutionContext));
+        flinkTask.init();
+        Assert.assertEquals("sql-client.sh -i /tmp/4483_init.sql -f /tmp/4483_node.sql", flinkTask.buildCommand());
+    }
+
+    private String buildFlinkParameters() {
+        ResourceInfo resource = new ResourceInfo();
+        resource.setId(2);
+        resource.setResourceName("/TopSpeedWindowing.jar");
+        resource.setRes("TopSpeedWindowing.jar");
+
+        FlinkParameters parameters = new FlinkParameters();
+        parameters.setLocalParams(Collections.emptyList());
+        parameters.setResourceList(Collections.emptyList());
+        parameters.setProgramType(ProgramType.JAVA);
+        parameters.setMainClass("org.apache.flink.streaming.examples.windowing.TopSpeedWindowing");
+        parameters.setMainJar(resource);
+        parameters.setDeployMode("cluster");
+        parameters.setAppName("TopSpeedWindowing");
+        parameters.setFlinkVersion(">=1.10");
+        parameters.setJobManagerMemory("1G");
+        parameters.setTaskManagerMemory("1G");
+        parameters.setSlot(1);
+        parameters.setTaskManager(2);
+        parameters.setParallelism(2);
+        return JSONUtils.toJsonString(parameters);
+    }
+
+    private String buildFlinkParametersWithFlinkSql() {
+        FlinkParameters parameters = new FlinkParameters();
+        parameters.setLocalParams(Collections.emptyList());
+        parameters.setInitScript("set sql-client.execution.result-mode=tableau;");
+        parameters.setRawScript("selcet 11111;");
+        parameters.setProgramType(ProgramType.SQL);
+        parameters.setMainClass(StringUtils.EMPTY);
+        parameters.setDeployMode("cluster");
+        parameters.setAppName("FlinkSQL");
+        parameters.setOthers(StringUtils.EMPTY);
+        parameters.setJobManagerMemory("1G");
+        parameters.setTaskManagerMemory("1G");
+        parameters.setParallelism(1);
+        parameters.setFlinkVersion(">=1.10");
+        return JSONUtils.toJsonString(parameters);
+    }
+}
diff --git a/dolphinscheduler-ui/src/locales/modules/en_US.ts b/dolphinscheduler-ui/src/locales/modules/en_US.ts
index 67b26f1ae6..badaf5bbf7 100644
--- a/dolphinscheduler-ui/src/locales/modules/en_US.ts
+++ b/dolphinscheduler-ui/src/locales/modules/en_US.ts
@@ -669,6 +669,8 @@ const project = {
     timeout_period_tips: 'Timeout must be a positive integer',
     script: 'Script',
     script_tips: 'Please enter script(required)',
+    init_script: 'Initialization script',
+    init_script_tips: 'Please enter initialization script',
     resources: 'Resources',
     resources_tips: 'Please select resources',
     non_resources_tips: 'Please delete all non-existent resources',
diff --git a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
index ea8a12809f..c75a83427d 100644
--- a/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
+++ b/dolphinscheduler-ui/src/locales/modules/zh_CN.ts
@@ -661,6 +661,8 @@ const project = {
     timeout_period_tips: '超时时长必须为正整数',
     script: '脚本',
     script_tips: '请输入脚本(必填)',
+    init_script: '初始化脚本',
+    init_script_tips: '请输入初始化脚本',
     resources: '资源',
     resources_tips: '请选择资源',
     no_resources_tips: '请删除所有未授权或已删除资源',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
index 9b2170ec90..ac8f4f0850 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-flink.ts
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { computed, ref } from 'vue'
+import { computed, ref, watchEffect } from 'vue'
 import { useI18n } from 'vue-i18n'
 import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
 import type { IJsonItem } from '../types'
@@ -22,7 +22,17 @@ import type { IJsonItem } from '../types'
 export function useFlink(model: { [field: string]: any }): IJsonItem[] {
   const { t } = useI18n()
   const mainClassSpan = computed(() =>
-    model.programType === 'PYTHON' ? 0 : 24
+    model.programType === 'PYTHON' || model.programType === 'SQL' ? 0 : 24
+  )
+
+  const mainArgsSpan = computed(() => (model.programType === 'SQL' ? 0 : 24))
+
+  const scriptSpan = computed(() => (model.programType === 'SQL' ? 24 : 0))
+
+  const flinkVersionOptions = computed(() =>
+    model.programType === 'SQL'
+      ? [{ label: '>=1.13', value: '>=1.13' }]
+      : FLINK_VERSIONS
   )
 
   const taskManagerNumberSpan = computed(() =>
@@ -35,6 +45,10 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
 
   const appNameSpan = computed(() => (model.deployMode === 'cluster' ? 24 : 0))
 
+  watchEffect(() => {
+    model.flinkVersion = model.programType === 'SQL' ? '>=1.13' : '<1.10'
+  })
+
   return [
     {
       type: 'select',
@@ -59,9 +73,13 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
       },
       validate: {
         trigger: ['input', 'blur'],
-        required: model.programType !== 'PYTHON',
+        required: model.programType !== 'PYTHON' && model.programType !== 'SQL',
         validator(validate: any, value: string) {
-          if (model.programType !== 'PYTHON' && !value) {
+          if (
+            model.programType !== 'PYTHON' &&
+            !value &&
+            model.programType !== 'SQL'
+          ) {
             return new Error(t('project.node.main_class_tips'))
           }
         }
@@ -69,11 +87,33 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
     },
     useMainJar(model),
     useDeployMode(24, ref(false)),
+    {
+      type: 'editor',
+      field: 'initScript',
+      span: scriptSpan,
+      name: t('project.node.init_script'),
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: false,
+        message: t('project.node.init_script_tips')
+      }
+    },
+    {
+      type: 'editor',
+      field: 'rawScript',
+      span: scriptSpan,
+      name: t('project.node.script'),
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: true,
+        message: t('project.node.script_tips')
+      }
+    },
     {
       type: 'select',
       field: 'flinkVersion',
       name: t('project.node.flink_version'),
-      options: FLINK_VERSIONS,
+      options: flinkVersionOptions,
       value: model.flinkVersion,
       span: deployModeSpan
     },
@@ -178,6 +218,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
     {
       type: 'input',
       field: 'mainArgs',
+      span: mainArgsSpan,
       name: t('project.node.main_arguments'),
       props: {
         type: 'textarea',
@@ -214,6 +255,10 @@ const PROGRAM_TYPES = [
   {
     label: 'PYTHON',
     value: 'PYTHON'
+  },
+  {
+    label: 'SQL',
+    value: 'SQL'
   }
 ]
 
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
index 263e4ce153..b617329c82 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-main-jar.ts
@@ -49,7 +49,9 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
   watch(
     () => model.programType,
     (value) => {
-      getMainJars(value)
+      if (value !== 'SQL') {
+        getMainJars(value)
+      }
     }
   )
 
@@ -70,7 +72,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
       trigger: ['input', 'blur'],
       required: model.programType !== 'SQL',
       validator(validate: any, value: string) {
-        if (!value) {
+        if (!value && model.programType !== 'SQL') {
           return new Error(t('project.node.main_package_tips'))
         }
       }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index dc583e7c0c..6bc512e5f4 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -343,6 +343,7 @@ export function formatParams(data: INodeData): {
           item.value = item.value || ''
           return item
         }),
+        initScript: data.initScript,
         rawScript: data.rawScript,
         resourceList: data.resourceList?.length
           ? data.resourceList.map((id: number) => ({ id }))
@@ -468,6 +469,10 @@ export function formatModel(data: ITaskData) {
     params.rawScript = data.taskParams?.rawScript
   }
 
+  if (data.taskParams?.initScript) {
+    params.initScript = data.taskParams?.initScript
+  }
+
   if (data.taskParams?.switchResult) {
     params.switchResult = data.taskParams.switchResult
     params.dependTaskList = data.taskParams.switchResult?.dependTaskList
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
index 1d3d8daf94..acf41992c8 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
@@ -45,6 +45,8 @@ export function useFlink({
     timeout: 30,
     programType: 'SCALA',
     deployMode: 'cluster',
+    initScript: '',
+    rawScript: '',
     flinkVersion: '<1.10',
     jobManagerMemory: '1G',
     taskManagerMemory: '2G',
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 9d9e413db1..368125f20f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -212,6 +212,7 @@ interface ITaskParams {
   mainJar?: ISourceItem
   localParams?: ILocalParam[]
   rawScript?: string
+  initScript?: string
   programType?: string
   sparkVersion?: string
   flinkVersion?: string