You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zi...@apache.org on 2022/04/27 09:44:11 UTC

[dolphinscheduler] branch dev updated: [Feature-9772][plugin/ui] support SparkSQL Task (#9790)

This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ed425d2d2d [Feature-9772][plugin/ui] support SparkSQL Task (#9790)
ed425d2d2d is described below

commit ed425d2d2d01f5f53f0d55e048d1ddd932c24b6f
Author: sq-q <83...@users.noreply.github.com>
AuthorDate: Wed Apr 27 17:44:05 2022 +0800

    [Feature-9772][plugin/ui] support SparkSQL Task (#9790)
    
    * [refactor] Dolphinscheduler sparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Dolphinscheduler SparkSQL
    
    * [refactor] Refactor ui code and add sparksql test cases
    
    * [refactor] refactor dolphinscheduler SparkSQL
    
    * [refactor] refactor dolphinscheduler plugin-sparkSQL
    
    * [refactor] refactor dolphinscheduler plugin-SparkSQL
    
    * [refactor] dolphinscheduler plugin-SparkTaskTest
    
    * [refactor] dolphinscheduler plugin-SparkTask
    
    * [refactor] dolphinscheduler plugin-Spark
    
    * [refactor] dolphinscheduler plugin-SparkTask-SparkSQL
    
    * [refactor] dolphinscheduler plugin-spark-SparkTask
    
    * [refactor] dolphinscheduler plugin-spark-SparkTask redefine code
---
 .../dolphinscheduler/common/enums/ProgramType.java |   5 +-
 .../common/enums/SparkVersion.java                 |   4 +-
 .../plugin/task/api/AbstractYarnTask.java          |  11 +-
 .../plugin/task/spark/ProgramType.java             |   5 +-
 .../plugin/task/spark/SparkArgsUtils.java          | 129 ----------------
 .../plugin/task/spark/SparkConstants.java          |   9 ++
 .../plugin/task/spark/SparkParameters.java         |  22 ++-
 .../plugin/task/spark/SparkTask.java               | 165 ++++++++++++++++++++-
 .../plugin/task/spark/SparkVersion.java            |   3 +-
 .../plugin/task/spark/SparkTaskTest.java           |  87 +++++++++++
 .../src/service/modules/resources/types.ts         |   2 +-
 .../src/store/project/types.ts                     |   2 +-
 .../task/components/node/fields/use-deploy-mode.ts |  65 ++++----
 .../task/components/node/fields/use-flink.ts       |   4 +-
 .../task/components/node/fields/use-main-jar.ts    |   9 +-
 .../projects/task/components/node/fields/use-mr.ts |  18 ++-
 .../task/components/node/fields/use-spark.ts       |  38 ++++-
 .../task/components/node/tasks/use-spark.ts        |   3 +-
 18 files changed, 393 insertions(+), 188 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
index 5064b1f394..f0b245215a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
@@ -22,9 +22,10 @@ package org.apache.dolphinscheduler.common.enums;
  */
 public enum ProgramType {
     /**
-     * 0 JAVA,1 SCALA,2 PYTHON
+     * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
      */
     JAVA,
     SCALA,
-    PYTHON
+    PYTHON,
+    SQL
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
index ba5bcdd511..0092b31b77 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
@@ -24,9 +24,11 @@ public enum SparkVersion {
     /**
      * 0 SPARK1
      * 1 SPARK2
+     * 2 SPARKSQL
      */
     SPARK1(0, "SPARK1"),
-    SPARK2(1, "SPARK2");
+    SPARK2(1, "SPARK2"),
+    SPARKSQL(2, "SPARKSQL");
 
     SparkVersion(int code, String descp) {
         this.code = code;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 860d5c13c7..aff6d253ab 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -37,8 +37,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
     public AbstractYarnTask(TaskExecutionContext taskRequest) {
         super(taskRequest);
         this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
-                taskRequest,
-                logger);
+            taskRequest,
+            logger);
     }
 
     @Override
@@ -73,7 +73,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
      * create command
      *
      * @return String
-     * @throws Exception exception
      */
     protected abstract String buildCommand();
 
@@ -94,8 +93,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
         }
 
         return mainJar.getId() == 0
-                ? mainJar.getRes()
-                // when update resource maybe has error
-                : mainJar.getResourceName().replaceFirst("/", "");
+            ? mainJar.getRes()
+            // when update resource maybe has error
+            : mainJar.getResourceName().replaceFirst("/", "");
     }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
index 05b15118d0..e26a2ffd03 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
@@ -23,9 +23,10 @@ package org.apache.dolphinscheduler.plugin.task.spark;
 public enum ProgramType {
 
     /**
-     * 0 JAVA,1 SCALA,2 PYTHON
+     * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
      */
     JAVA,
     SCALA,
-    PYTHON
+    PYTHON,
+    SQL
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java
deleted file mode 100644
index 1a5c662118..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.spark;
-
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * spark args utils
- */
-public class SparkArgsUtils {
-
-    private static final String SPARK_CLUSTER = "cluster";
-
-    private static final String SPARK_LOCAL = "local";
-
-    private static final String SPARK_ON_YARN = "yarn";
-
-    private SparkArgsUtils() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    /**
-     * build args
-     *
-     * @param param param
-     * @return argument list
-     */
-    public static List<String> buildArgs(SparkParameters param) {
-        List<String> args = new ArrayList<>();
-        args.add(SparkConstants.MASTER);
-
-        String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
-        if (!SPARK_LOCAL.equals(deployMode)) {
-            args.add(SPARK_ON_YARN);
-            args.add(SparkConstants.DEPLOY_MODE);
-        }
-        args.add(deployMode);
-
-        ProgramType programType = param.getProgramType();
-        String mainClass = param.getMainClass();
-        if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
-            args.add(SparkConstants.MAIN_CLASS);
-            args.add(mainClass);
-        }
-
-        int driverCores = param.getDriverCores();
-        if (driverCores > 0) {
-            args.add(SparkConstants.DRIVER_CORES);
-            args.add(String.format("%d", driverCores));
-        }
-
-        String driverMemory = param.getDriverMemory();
-        if (StringUtils.isNotEmpty(driverMemory)) {
-            args.add(SparkConstants.DRIVER_MEMORY);
-            args.add(driverMemory);
-        }
-
-        int numExecutors = param.getNumExecutors();
-        if (numExecutors > 0) {
-            args.add(SparkConstants.NUM_EXECUTORS);
-            args.add(String.format("%d", numExecutors));
-        }
-
-        int executorCores = param.getExecutorCores();
-        if (executorCores > 0) {
-            args.add(SparkConstants.EXECUTOR_CORES);
-            args.add(String.format("%d", executorCores));
-        }
-
-        String executorMemory = param.getExecutorMemory();
-        if (StringUtils.isNotEmpty(executorMemory)) {
-            args.add(SparkConstants.EXECUTOR_MEMORY);
-            args.add(executorMemory);
-        }
-
-        String appName = param.getAppName();
-        if (StringUtils.isNotEmpty(appName)) {
-            args.add(SparkConstants.SPARK_NAME);
-            args.add(ArgsUtils.escape(appName));
-        }
-
-        String others = param.getOthers();
-        if (!SPARK_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
-            String queue = param.getQueue();
-            if (StringUtils.isNotEmpty(queue)) {
-                args.add(SparkConstants.SPARK_QUEUE);
-                args.add(queue);
-            }
-        }
-
-        // --conf --files --jars --packages
-        if (StringUtils.isNotEmpty(others)) {
-            args.add(others);
-        }
-
-        ResourceInfo mainJar = param.getMainJar();
-        if (mainJar != null) {
-            args.add(mainJar.getRes());
-        }
-
-        String mainArgs = param.getMainArgs();
-        if (StringUtils.isNotEmpty(mainArgs)) {
-            args.add(mainArgs);
-        }
-
-        return args;
-    }
-
-}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
index dc6335cce0..1dacddbf83 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
@@ -40,6 +40,8 @@ public class SparkConstants {
 
     public static final String DEPLOY_MODE = "--deploy-mode";
 
+    public static final String DEPLOY_MODE_LOCAL = "local";
+
     /**
      * --driver-cores NUM
      */
@@ -55,6 +57,8 @@ public class SparkConstants {
      */
     public static final String MASTER = "--master";
 
+    public static final String SPARK_ON_YARN = "yarn";
+
     /**
      * --num-executors NUM
      */
@@ -70,4 +74,9 @@ public class SparkConstants {
      */
     public static final String EXECUTOR_MEMORY = "--executor-memory";
 
+    /**
+     * -f <filename> SQL from files
+     */
+    public static final String SQL_FROM_FILE = "-f";
+
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
index 44a3d0cc81..78aed34af1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
@@ -91,7 +91,7 @@ public class SparkParameters extends AbstractParameters {
 
     /**
      * program type
-     * 0 JAVA,1 SCALA,2 PYTHON
+     * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
      */
     private ProgramType programType;
 
@@ -100,6 +100,11 @@ public class SparkParameters extends AbstractParameters {
      */
     private String sparkVersion;
 
+    /**
+     * spark sql script
+     */
+    private String rawScript;
+
     /**
      * resource list
      */
@@ -225,9 +230,22 @@ public class SparkParameters extends AbstractParameters {
         this.sparkVersion = sparkVersion;
     }
 
+    public String getRawScript() {
+        return rawScript;
+    }
+
+    public void setRawScript(String rawScript) {
+        this.rawScript = rawScript;
+    }
+
     @Override
     public boolean checkParameters() {
-        return mainJar != null && programType != null;
+        /**
+         * When saving a task, the parameters cannot be empty and mainJar or rawScript cannot be empty:
+         * (1) When ProgramType is SQL, rawScript cannot be empty.
+         * (2) When ProgramType is Java/Scala/Python, mainJar cannot be empty.
+         */
+        return programType != null && (mainJar != null || rawScript != null);
     }
 
     @Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 54073e8fdd..abe3827858 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.plugin.task.spark;
 
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
 import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -24,13 +26,25 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class SparkTask extends AbstractYarnTask {
 
@@ -65,32 +79,45 @@ public class SparkTask extends AbstractYarnTask {
             throw new RuntimeException("spark task params is not valid");
         }
         sparkParameters.setQueue(taskExecutionContext.getQueue());
-        setMainJarName();
+
+        if (sparkParameters.getProgramType() != ProgramType.SQL) {
+            setMainJarName();
+        }
     }
 
     /**
      * create command
+     *
      * @return command
      */
     @Override
     protected String buildCommand() {
-        // spark-submit [options] <app jar | python file> [app arguments]
+        /**
+         * (1) spark-submit [options] <app jar | python file> [app arguments]
+         * (2) spark-sql [options] -f <filename>
+         */
         List<String> args = new ArrayList<>();
 
         // spark version
         String sparkCommand = SparkVersion.SPARK2.getCommand();
 
+        // If the programType is non-SQL, execute bin/spark-submit
         if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
             sparkCommand = SparkVersion.SPARK1.getCommand();
         }
 
+        // If the programType is SQL, execute bin/spark-sql
+        if (sparkParameters.getProgramType() == ProgramType.SQL) {
+            sparkCommand = SparkVersion.SPARKSQL.getCommand();
+        }
+
         args.add(sparkCommand);
 
-        // other parameters
-        args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
+        // populate spark options
+        args.addAll(populateSparkOptions());
 
         // replace placeholder, and combining local and global parameters
-        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
         if (MapUtils.isEmpty(paramsMap)) {
             paramsMap = new HashMap<>();
         }
@@ -105,6 +132,134 @@ public class SparkTask extends AbstractYarnTask {
         return command;
     }
 
+    /**
+     * build spark options
+     *
+     * @return argument list
+     */
+    private List<String> populateSparkOptions() {
+        List<String> args = new ArrayList<>();
+        args.add(SparkConstants.MASTER);
+
+        String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL;
+        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
+            args.add(SparkConstants.SPARK_ON_YARN);
+            args.add(SparkConstants.DEPLOY_MODE);
+        }
+        args.add(deployMode);
+
+        ProgramType programType = sparkParameters.getProgramType();
+        String mainClass = sparkParameters.getMainClass();
+        if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
+            args.add(SparkConstants.MAIN_CLASS);
+            args.add(mainClass);
+        }
+
+        populateSparkResourceDefinitions(args);
+
+        String appName = sparkParameters.getAppName();
+        if (StringUtils.isNotEmpty(appName)) {
+            args.add(SparkConstants.SPARK_NAME);
+            args.add(ArgsUtils.escape(appName));
+        }
+
+        String others = sparkParameters.getOthers();
+        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
+            String queue = sparkParameters.getQueue();
+            if (StringUtils.isNotEmpty(queue)) {
+                args.add(SparkConstants.SPARK_QUEUE);
+                args.add(queue);
+            }
+        }
+
+        // --conf --files --jars --packages
+        if (StringUtils.isNotEmpty(others)) {
+            args.add(others);
+        }
+
+        ResourceInfo mainJar = sparkParameters.getMainJar();
+        if (programType != ProgramType.SQL) {
+            args.add(mainJar.getRes());
+        }
+
+        String mainArgs = sparkParameters.getMainArgs();
+        if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
+            args.add(mainArgs);
+        }
+
+        // bin/spark-sql -f fileName
+        if (ProgramType.SQL == programType) {
+            args.add(SparkConstants.SQL_FROM_FILE);
+            args.add(generateScriptFile());
+        }
+        return args;
+    }
+
+    private void populateSparkResourceDefinitions(List<String> args) {
+        int driverCores = sparkParameters.getDriverCores();
+        if (driverCores > 0) {
+            args.add(SparkConstants.DRIVER_CORES);
+            args.add(String.format("%d", driverCores));
+        }
+
+        String driverMemory = sparkParameters.getDriverMemory();
+        if (StringUtils.isNotEmpty(driverMemory)) {
+            args.add(SparkConstants.DRIVER_MEMORY);
+            args.add(driverMemory);
+        }
+
+        int numExecutors = sparkParameters.getNumExecutors();
+        if (numExecutors > 0) {
+            args.add(SparkConstants.NUM_EXECUTORS);
+            args.add(String.format("%d", numExecutors));
+        }
+
+        int executorCores = sparkParameters.getExecutorCores();
+        if (executorCores > 0) {
+            args.add(SparkConstants.EXECUTOR_CORES);
+            args.add(String.format("%d", executorCores));
+        }
+
+        String executorMemory = sparkParameters.getExecutorMemory();
+        if (StringUtils.isNotEmpty(executorMemory)) {
+            args.add(SparkConstants.EXECUTOR_MEMORY);
+            args.add(executorMemory);
+        }
+    }
+
+    private String generateScriptFile() {
+        String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+
+        File file = new File(scriptFileName);
+        Path path = file.toPath();
+
+        if (!Files.exists(path)) {
+            String script = sparkParameters.getRawScript().replaceAll("\\r\\n", "\n");
+            sparkParameters.setRawScript(script);
+
+            logger.info("raw script : {}", sparkParameters.getRawScript());
+            logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
+
+            Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
+            FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
+            try {
+                if (OSUtils.isWindows()) {
+                    Files.createFile(path);
+                } else {
+                    if (!file.getParentFile().exists()) {
+                        file.getParentFile().mkdirs();
+                    }
+                    Files.createFile(path, attr);
+                }
+                Files.write(path, sparkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
+            } catch (IOException e) {
+                throw new RuntimeException("generate spark sql script error", e);
+            }
+
+        }
+        return scriptFileName;
+    }
+
     @Override
     protected void setMainJarName() {
         // main jar
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
index 36398c7456..02c357914b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
@@ -22,10 +22,11 @@ public enum SparkVersion {
     /**
      * 0 SPARK1
      * 1 SPARK2
+     * 2 SPARKSQL
      */
     SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
     SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
-    ;
+    SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
 
     private final int code;
     private final String descp;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
new file mode 100644
index 0000000000..17c2ff0c4b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.spark;
+
+import java.util.Collections;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+    JSONUtils.class
+})
+@PowerMockIgnore({"javax.*"})
+
+public class SparkTaskTest {
+
+    @Test
+    public void testBuildCommandWithSparkSql() throws Exception {
+        String parameters = buildSparkParametersWithSparkSql();
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+
+        SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+        sparkTask.init();
+        Assert.assertEquals(sparkTask.buildCommand(),
+            "${SPARK_HOME2}/bin/spark-sql " +
+                "--master yarn " +
+                "--deploy-mode client " +
+                "--driver-cores 1 " +
+                "--driver-memory 512M " +
+                "--num-executors 2 " +
+                "--executor-cores 2 " +
+                "--executor-memory 1G " +
+                "--name sparksql " +
+                "-f /tmp/5536_node.sql");
+    }
+
+    private String buildSparkParametersWithSparkSql() {
+        SparkParameters sparkParameters = new SparkParameters();
+        sparkParameters.setLocalParams(Collections.emptyList());
+        sparkParameters.setRawScript("selcet 11111;");
+        sparkParameters.setProgramType(ProgramType.SQL);
+        sparkParameters.setMainClass(StringUtils.EMPTY);
+        sparkParameters.setDeployMode("client");
+        sparkParameters.setAppName("sparksql");
+        sparkParameters.setOthers(StringUtils.EMPTY);
+        sparkParameters.setSparkVersion("SPARK2");
+        sparkParameters.setDriverCores(1);
+        sparkParameters.setDriverMemory("512M");
+        sparkParameters.setNumExecutors(2);
+        sparkParameters.setExecutorMemory("1G");
+        sparkParameters.setExecutorCores(2);
+        return JSONUtils.toJsonString(sparkParameters);
+    }
+
+}
diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
index 43a532d4e7..71edfa4943 100644
--- a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
+++ b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
@@ -66,7 +66,7 @@ interface OnlineCreateReq extends CreateReq, ContentReq {
 }
 
 interface ProgramTypeReq {
-  programType: 'JAVA' | 'SCALA' | 'PYTHON'
+  programType: 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL'
 }
 
 interface ListReq {
diff --git a/dolphinscheduler-ui-next/src/store/project/types.ts b/dolphinscheduler-ui-next/src/store/project/types.ts
index 99a0e67d17..147a84503e 100644
--- a/dolphinscheduler-ui-next/src/store/project/types.ts
+++ b/dolphinscheduler-ui-next/src/store/project/types.ts
@@ -18,7 +18,7 @@
 import type { EditWorkflowDefinition } from '@/views/projects/workflow/components/dag/types'
 import type { IOption } from '@/components/form/types'
 
-type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
+type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL'
 type DependentResultType = {
   [key: string]: 'SUCCESS' | 'WAITING_THREAD' | 'FAILURE'
 }
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
index 13f812132d..8f87d1cfb2 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
@@ -14,34 +14,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { useI18n } from 'vue-i18n'
-import type { IJsonItem } from '../types'
+import {ref, watchEffect} from 'vue'
+import {useI18n} from 'vue-i18n'
+import type {IJsonItem, IOption} from '../types'
 
-export function useDeployMode(span = 24, showClient = true): IJsonItem {
-  const { t } = useI18n()
+export function useDeployMode(span = 24, showClient = ref(true), showCluster = ref(true)): IJsonItem {
+    const {t} = useI18n()
 
-  return {
-    type: 'radio',
-    field: 'deployMode',
-    name: t('project.node.deploy_mode'),
-    options: DEPLOY_MODES.filter((option) =>
-      option.value === 'client' ? showClient : true
-    ),
-    span
-  }
+    const deployModeOptions = ref(DEPLOY_MODES as IOption[])
+
+    watchEffect(
+        () => {
+            deployModeOptions.value = DEPLOY_MODES.filter((option) => {
+                switch (option.value) {
+                    case 'cluster':
+                        return showCluster.value
+                    case 'client':
+                        return showClient.value
+                    default:
+                        return true
+                }
+            })
+        }
+    )
+    return {
+        type: 'radio',
+        field: 'deployMode',
+        name: t('project.node.deploy_mode'),
+        options: deployModeOptions,
+        span
+    }
 }
 
 export const DEPLOY_MODES = [
-  {
-    label: 'cluster',
-    value: 'cluster'
-  },
-  {
-    label: 'client',
-    value: 'client'
-  },
-  {
-    label: 'local',
-    value: 'local'
-  }
+    {
+        label: 'cluster',
+        value: 'cluster'
+    },
+    {
+        label: 'client',
+        value: 'client'
+    },
+    {
+        label: 'local',
+        value: 'local'
+    }
 ]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
index d1e138211f..9b2170ec90 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { computed } from 'vue'
+import { computed, ref } from 'vue'
 import { useI18n } from 'vue-i18n'
 import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
 import type { IJsonItem } from '../types'
@@ -68,7 +68,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
       }
     },
     useMainJar(model),
-    useDeployMode(24, false),
+    useDeployMode(24, ref(false)),
     {
       type: 'select',
       field: 'flinkVersion',
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
index 41356b07d6..e8008a6a3f 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-import { ref, onMounted, watch } from 'vue'
+import { computed, ref, onMounted, watch } from 'vue'
 import { useI18n } from 'vue-i18n'
 import { queryResourceByProgramType } from '@/service/modules/resources'
 import { useTaskNodeStore } from '@/store/project/task-node'
@@ -27,6 +26,9 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
   const mainJarOptions = ref([] as IMainJar[])
   const taskStore = useTaskNodeStore()
 
+  const mainJarSpan = computed(() =>
+      model.programType === 'SQL' ? 0 : 24
+  )
   const getMainJars = async (programType: ProgramType) => {
     const storeMainJar = taskStore.getMainJar(programType)
     if (storeMainJar) {
@@ -57,6 +59,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
     type: 'tree-select',
     field: 'mainJar',
     name: t('project.node.main_package'),
+    span: mainJarSpan,
     props: {
       cascade: true,
       showPath: true,
@@ -67,7 +70,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
     },
     validate: {
       trigger: ['input', 'blur'],
-      required: true,
+      required: model.programType !== 'SQL',
       validator(validate: any, value: string) {
         if (!value) {
           return new Error(t('project.node.main_package_tips'))
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
index ea64355072..76b89b342d 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
@@ -16,7 +16,6 @@
  */
 import { computed } from 'vue'
 import { useI18n } from 'vue-i18n'
-import { PROGRAM_TYPES } from './use-spark'
 import { useCustomParams, useMainJar, useResources } from '.'
 import type { IJsonItem } from '../types'
 
@@ -24,7 +23,7 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] {
   const { t } = useI18n()
 
   const mainClassSpan = computed(() =>
-    model.programType === 'PYTHON' ? 0 : 24
+      (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24
   )
 
   return [
@@ -91,3 +90,18 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] {
     ...useCustomParams({ model, field: 'localParams', isSimple: true })
   ]
 }
+
+export const PROGRAM_TYPES = [
+  {
+    label: 'JAVA',
+    value: 'JAVA'
+  },
+  {
+    label: 'SCALA',
+    value: 'SCALA'
+  },
+  {
+    label: 'PYTHON',
+    value: 'PYTHON'
+  }
+]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
index fe33b1cd67..b28aa359ac 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import { computed } from 'vue'
+import { computed, ref } from 'vue'
 import { useI18n } from 'vue-i18n'
 import {
   useCustomParams,
@@ -32,7 +32,19 @@ import type { IJsonItem } from '../types'
 export function useSpark(model: { [field: string]: any }): IJsonItem[] {
   const { t } = useI18n()
   const mainClassSpan = computed(() =>
-    model.programType === 'PYTHON' ? 0 : 24
+      (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24
+  )
+
+  const mainArgsSpan = computed(() =>
+      model.programType === 'SQL' ? 0 : 24
+  )
+
+  const rawScriptSpan = computed(() =>
+      model.programType === 'SQL' ? 24 : 0
+  )
+
+  const showCluster = computed(() =>
+      model.programType !== 'SQL'
   )
 
   return [
@@ -66,16 +78,27 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
       },
       validate: {
         trigger: ['input', 'blur'],
-        required: model.programType !== 'PYTHON',
+        required: model.programType !== 'PYTHON' && model.programType !== 'SQL',
         validator(validate: any, value: string) {
-          if (model.programType !== 'PYTHON' && !value) {
+          if (model.programType !== 'PYTHON' && !value && model.programType !== 'SQL') {
             return new Error(t('project.node.main_class_tips'))
           }
         }
       }
     },
     useMainJar(model),
-    useDeployMode(),
+    {
+      type: 'editor',
+      field: 'rawScript',
+      span: rawScriptSpan,
+      name: t('project.node.script'),
+      validate: {
+        trigger: ['input', 'trigger'],
+        required: true,
+        message: t('project.node.script_tips')
+      }
+    },
+    useDeployMode(24, ref(true), showCluster),
     {
       type: 'input',
       field: 'appName',
@@ -92,6 +115,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
     {
       type: 'input',
       field: 'mainArgs',
+      span: mainArgsSpan,
       name: t('project.node.main_arguments'),
       props: {
         type: 'textarea',
@@ -124,6 +148,10 @@ export const PROGRAM_TYPES = [
   {
     label: 'PYTHON',
     value: 'PYTHON'
+  },
+  {
+    label: 'SQL',
+    value: 'SQL'
   }
 ]
 
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
index 520a5481dc..46061ffc9c 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
@@ -45,7 +45,8 @@ export function useSpark({
     timeout: 30,
     programType: 'SCALA',
     sparkVersion: 'SPARK2',
-    deployMode: 'cluster',
+    rawScript: '',
+    deployMode: 'local',
     driverCores: 1,
     driverMemory: '512M',
     numExecutors: 2,