You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by zi...@apache.org on 2022/04/27 09:44:11 UTC
[dolphinscheduler] branch dev updated: [Feature-9772][plugin/ui] support SparkSQL Task (#9790)
This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ed425d2d2d [Feature-9772][plugin/ui] support SparkSQL Task (#9790)
ed425d2d2d is described below
commit ed425d2d2d01f5f53f0d55e048d1ddd932c24b6f
Author: sq-q <83...@users.noreply.github.com>
AuthorDate: Wed Apr 27 17:44:05 2022 +0800
[Feature-9772][plugin/ui] support SparkSQL Task (#9790)
* [refactor] Dolphinscheduler sparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Dolphinscheduler SparkSQL
* [refactor] Refactor ui code and add sparksql test cases
* [refactor] refactor dolphinscheduler SparkSQL
* [refactor] refactor dolphinscheduler plugin-sparkSQL
* [refactor] refactor dolphinscheduler plugin-SparkSQL
* [refactor] dolphinscheduler plugin-SparkTaskTest
* [refactor] dolphinscheduler plugin-SparkTask
* [refactor] dolphinscheduler plugin-Spark
* [refactor] dolphinscheduler plugin-SparkTask-SparkSQL
* [refactor] dolphinscheduler plugin-spark-SparkTask
* [refactor] dolphinscheduler plugin-spark-SparkTask redefine code
---
.../dolphinscheduler/common/enums/ProgramType.java | 5 +-
.../common/enums/SparkVersion.java | 4 +-
.../plugin/task/api/AbstractYarnTask.java | 11 +-
.../plugin/task/spark/ProgramType.java | 5 +-
.../plugin/task/spark/SparkArgsUtils.java | 129 ----------------
.../plugin/task/spark/SparkConstants.java | 9 ++
.../plugin/task/spark/SparkParameters.java | 22 ++-
.../plugin/task/spark/SparkTask.java | 165 ++++++++++++++++++++-
.../plugin/task/spark/SparkVersion.java | 3 +-
.../plugin/task/spark/SparkTaskTest.java | 87 +++++++++++
.../src/service/modules/resources/types.ts | 2 +-
.../src/store/project/types.ts | 2 +-
.../task/components/node/fields/use-deploy-mode.ts | 65 ++++----
.../task/components/node/fields/use-flink.ts | 4 +-
.../task/components/node/fields/use-main-jar.ts | 9 +-
.../projects/task/components/node/fields/use-mr.ts | 18 ++-
.../task/components/node/fields/use-spark.ts | 38 ++++-
.../task/components/node/tasks/use-spark.ts | 3 +-
18 files changed, 393 insertions(+), 188 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
index 5064b1f394..f0b245215a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ProgramType.java
@@ -22,9 +22,10 @@ package org.apache.dolphinscheduler.common.enums;
*/
public enum ProgramType {
/**
- * 0 JAVA,1 SCALA,2 PYTHON
+ * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
*/
JAVA,
SCALA,
- PYTHON
+ PYTHON,
+ SQL
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
index ba5bcdd511..0092b31b77 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
@@ -24,9 +24,11 @@ public enum SparkVersion {
/**
* 0 SPARK1
* 1 SPARK2
+ * 2 SPARKSQL
*/
SPARK1(0, "SPARK1"),
- SPARK2(1, "SPARK2");
+ SPARK2(1, "SPARK2"),
+ SPARKSQL(2, "SPARKSQL");
SparkVersion(int code, String descp) {
this.code = code;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
index 860d5c13c7..aff6d253ab 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
@@ -37,8 +37,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
- taskRequest,
- logger);
+ taskRequest,
+ logger);
}
@Override
@@ -73,7 +73,6 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
* create command
*
* @return String
- * @throws Exception exception
*/
protected abstract String buildCommand();
@@ -94,8 +93,8 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
}
return mainJar.getId() == 0
- ? mainJar.getRes()
- // when update resource maybe has error
- : mainJar.getResourceName().replaceFirst("/", "");
+ ? mainJar.getRes()
+ // when update resource maybe has error
+ : mainJar.getResourceName().replaceFirst("/", "");
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
index 05b15118d0..e26a2ffd03 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/ProgramType.java
@@ -23,9 +23,10 @@ package org.apache.dolphinscheduler.plugin.task.spark;
public enum ProgramType {
/**
- * 0 JAVA,1 SCALA,2 PYTHON
+ * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
*/
JAVA,
SCALA,
- PYTHON
+ PYTHON,
+ SQL
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java
deleted file mode 100644
index 1a5c662118..0000000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkArgsUtils.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.spark;
-
-import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
-import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * spark args utils
- */
-public class SparkArgsUtils {
-
- private static final String SPARK_CLUSTER = "cluster";
-
- private static final String SPARK_LOCAL = "local";
-
- private static final String SPARK_ON_YARN = "yarn";
-
- private SparkArgsUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * build args
- *
- * @param param param
- * @return argument list
- */
- public static List<String> buildArgs(SparkParameters param) {
- List<String> args = new ArrayList<>();
- args.add(SparkConstants.MASTER);
-
- String deployMode = StringUtils.isNotEmpty(param.getDeployMode()) ? param.getDeployMode() : SPARK_CLUSTER;
- if (!SPARK_LOCAL.equals(deployMode)) {
- args.add(SPARK_ON_YARN);
- args.add(SparkConstants.DEPLOY_MODE);
- }
- args.add(deployMode);
-
- ProgramType programType = param.getProgramType();
- String mainClass = param.getMainClass();
- if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
- args.add(SparkConstants.MAIN_CLASS);
- args.add(mainClass);
- }
-
- int driverCores = param.getDriverCores();
- if (driverCores > 0) {
- args.add(SparkConstants.DRIVER_CORES);
- args.add(String.format("%d", driverCores));
- }
-
- String driverMemory = param.getDriverMemory();
- if (StringUtils.isNotEmpty(driverMemory)) {
- args.add(SparkConstants.DRIVER_MEMORY);
- args.add(driverMemory);
- }
-
- int numExecutors = param.getNumExecutors();
- if (numExecutors > 0) {
- args.add(SparkConstants.NUM_EXECUTORS);
- args.add(String.format("%d", numExecutors));
- }
-
- int executorCores = param.getExecutorCores();
- if (executorCores > 0) {
- args.add(SparkConstants.EXECUTOR_CORES);
- args.add(String.format("%d", executorCores));
- }
-
- String executorMemory = param.getExecutorMemory();
- if (StringUtils.isNotEmpty(executorMemory)) {
- args.add(SparkConstants.EXECUTOR_MEMORY);
- args.add(executorMemory);
- }
-
- String appName = param.getAppName();
- if (StringUtils.isNotEmpty(appName)) {
- args.add(SparkConstants.SPARK_NAME);
- args.add(ArgsUtils.escape(appName));
- }
-
- String others = param.getOthers();
- if (!SPARK_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
- String queue = param.getQueue();
- if (StringUtils.isNotEmpty(queue)) {
- args.add(SparkConstants.SPARK_QUEUE);
- args.add(queue);
- }
- }
-
- // --conf --files --jars --packages
- if (StringUtils.isNotEmpty(others)) {
- args.add(others);
- }
-
- ResourceInfo mainJar = param.getMainJar();
- if (mainJar != null) {
- args.add(mainJar.getRes());
- }
-
- String mainArgs = param.getMainArgs();
- if (StringUtils.isNotEmpty(mainArgs)) {
- args.add(mainArgs);
- }
-
- return args;
- }
-
-}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
index dc6335cce0..1dacddbf83 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkConstants.java
@@ -40,6 +40,8 @@ public class SparkConstants {
public static final String DEPLOY_MODE = "--deploy-mode";
+ public static final String DEPLOY_MODE_LOCAL = "local";
+
/**
* --driver-cores NUM
*/
@@ -55,6 +57,8 @@ public class SparkConstants {
*/
public static final String MASTER = "--master";
+ public static final String SPARK_ON_YARN = "yarn";
+
/**
* --num-executors NUM
*/
@@ -70,4 +74,9 @@ public class SparkConstants {
*/
public static final String EXECUTOR_MEMORY = "--executor-memory";
+ /**
+ * -f <filename> SQL from files
+ */
+ public static final String SQL_FROM_FILE = "-f";
+
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
index 44a3d0cc81..78aed34af1 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkParameters.java
@@ -91,7 +91,7 @@ public class SparkParameters extends AbstractParameters {
/**
* program type
- * 0 JAVA,1 SCALA,2 PYTHON
+ * 0 JAVA,1 SCALA,2 PYTHON,3 SQL
*/
private ProgramType programType;
@@ -100,6 +100,11 @@ public class SparkParameters extends AbstractParameters {
*/
private String sparkVersion;
+ /**
+ * spark sql script
+ */
+ private String rawScript;
+
/**
* resource list
*/
@@ -225,9 +230,22 @@ public class SparkParameters extends AbstractParameters {
this.sparkVersion = sparkVersion;
}
+ public String getRawScript() {
+ return rawScript;
+ }
+
+ public void setRawScript(String rawScript) {
+ this.rawScript = rawScript;
+ }
+
@Override
public boolean checkParameters() {
- return mainJar != null && programType != null;
+ /**
+ * When saving a task, the parameters cannot be empty and mainJar or rawScript cannot be empty:
+ * (1) When ProgramType is SQL, rawScript cannot be empty.
+ * (2) When ProgramType is Java/Scala/Python, mainJar cannot be empty.
+ */
+ return programType != null && (mainJar != null || rawScript != null);
}
@Override
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 54073e8fdd..abe3827858 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.spark;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
+
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@@ -24,13 +26,25 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class SparkTask extends AbstractYarnTask {
@@ -65,32 +79,45 @@ public class SparkTask extends AbstractYarnTask {
throw new RuntimeException("spark task params is not valid");
}
sparkParameters.setQueue(taskExecutionContext.getQueue());
- setMainJarName();
+
+ if (sparkParameters.getProgramType() != ProgramType.SQL) {
+ setMainJarName();
+ }
}
/**
* create command
+ *
* @return command
*/
@Override
protected String buildCommand() {
- // spark-submit [options] <app jar | python file> [app arguments]
+ /**
+ * (1) spark-submit [options] <app jar | python file> [app arguments]
+ * (2) spark-sql [options] -f <filename>
+ */
List<String> args = new ArrayList<>();
// spark version
String sparkCommand = SparkVersion.SPARK2.getCommand();
+ // If the programType is non-SQL, execute bin/spark-submit
if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
sparkCommand = SparkVersion.SPARK1.getCommand();
}
+ // If the programType is SQL, execute bin/spark-sql
+ if (sparkParameters.getProgramType() == ProgramType.SQL) {
+ sparkCommand = SparkVersion.SPARKSQL.getCommand();
+ }
+
args.add(sparkCommand);
- // other parameters
- args.addAll(SparkArgsUtils.buildArgs(sparkParameters));
+ // populate spark options
+ args.addAll(populateSparkOptions());
// replace placeholder, and combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
@@ -105,6 +132,134 @@ public class SparkTask extends AbstractYarnTask {
return command;
}
+ /**
+ * build spark options
+ *
+ * @return argument list
+ */
+ private List<String> populateSparkOptions() {
+ List<String> args = new ArrayList<>();
+ args.add(SparkConstants.MASTER);
+
+ String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode() : SparkConstants.DEPLOY_MODE_LOCAL;
+ if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
+ args.add(SparkConstants.SPARK_ON_YARN);
+ args.add(SparkConstants.DEPLOY_MODE);
+ }
+ args.add(deployMode);
+
+ ProgramType programType = sparkParameters.getProgramType();
+ String mainClass = sparkParameters.getMainClass();
+ if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
+ args.add(SparkConstants.MAIN_CLASS);
+ args.add(mainClass);
+ }
+
+ populateSparkResourceDefinitions(args);
+
+ String appName = sparkParameters.getAppName();
+ if (StringUtils.isNotEmpty(appName)) {
+ args.add(SparkConstants.SPARK_NAME);
+ args.add(ArgsUtils.escape(appName));
+ }
+
+ String others = sparkParameters.getOthers();
+ if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode) && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_QUEUE))) {
+ String queue = sparkParameters.getQueue();
+ if (StringUtils.isNotEmpty(queue)) {
+ args.add(SparkConstants.SPARK_QUEUE);
+ args.add(queue);
+ }
+ }
+
+ // --conf --files --jars --packages
+ if (StringUtils.isNotEmpty(others)) {
+ args.add(others);
+ }
+
+ ResourceInfo mainJar = sparkParameters.getMainJar();
+ if (programType != ProgramType.SQL) {
+ args.add(mainJar.getRes());
+ }
+
+ String mainArgs = sparkParameters.getMainArgs();
+ if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
+ args.add(mainArgs);
+ }
+
+ // bin/spark-sql -f fileName
+ if (ProgramType.SQL == programType) {
+ args.add(SparkConstants.SQL_FROM_FILE);
+ args.add(generateScriptFile());
+ }
+ return args;
+ }
+
+ private void populateSparkResourceDefinitions(List<String> args) {
+ int driverCores = sparkParameters.getDriverCores();
+ if (driverCores > 0) {
+ args.add(SparkConstants.DRIVER_CORES);
+ args.add(String.format("%d", driverCores));
+ }
+
+ String driverMemory = sparkParameters.getDriverMemory();
+ if (StringUtils.isNotEmpty(driverMemory)) {
+ args.add(SparkConstants.DRIVER_MEMORY);
+ args.add(driverMemory);
+ }
+
+ int numExecutors = sparkParameters.getNumExecutors();
+ if (numExecutors > 0) {
+ args.add(SparkConstants.NUM_EXECUTORS);
+ args.add(String.format("%d", numExecutors));
+ }
+
+ int executorCores = sparkParameters.getExecutorCores();
+ if (executorCores > 0) {
+ args.add(SparkConstants.EXECUTOR_CORES);
+ args.add(String.format("%d", executorCores));
+ }
+
+ String executorMemory = sparkParameters.getExecutorMemory();
+ if (StringUtils.isNotEmpty(executorMemory)) {
+ args.add(SparkConstants.EXECUTOR_MEMORY);
+ args.add(executorMemory);
+ }
+ }
+
+ private String generateScriptFile() {
+ String scriptFileName = String.format("%s/%s_node.sql", taskExecutionContext.getExecutePath(), taskExecutionContext.getTaskAppId());
+
+ File file = new File(scriptFileName);
+ Path path = file.toPath();
+
+ if (!Files.exists(path)) {
+ String script = sparkParameters.getRawScript().replaceAll("\\r\\n", "\n");
+ sparkParameters.setRawScript(script);
+
+ logger.info("raw script : {}", sparkParameters.getRawScript());
+ logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
+
+ Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
+ FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
+ try {
+ if (OSUtils.isWindows()) {
+ Files.createFile(path);
+ } else {
+ if (!file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ }
+ Files.createFile(path, attr);
+ }
+ Files.write(path, sparkParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
+ } catch (IOException e) {
+ throw new RuntimeException("generate spark sql script error", e);
+ }
+
+ }
+ return scriptFileName;
+ }
+
@Override
protected void setMainJarName() {
// main jar
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
index 36398c7456..02c357914b 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
@@ -22,10 +22,11 @@ public enum SparkVersion {
/**
* 0 SPARK1
* 1 SPARK2
+ * 2 SPARKSQL
*/
SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
- ;
+ SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
private final int code;
private final String descp;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
new file mode 100644
index 0000000000..17c2ff0c4b
--- /dev/null
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.spark;
+
+import java.util.Collections;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.powermock.api.mockito.PowerMockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ JSONUtils.class
+})
+@PowerMockIgnore({"javax.*"})
+
+public class SparkTaskTest {
+
+ @Test
+ public void testBuildCommandWithSparkSql() throws Exception {
+ String parameters = buildSparkParametersWithSparkSql();
+ TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+ when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+
+ SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+ sparkTask.init();
+ Assert.assertEquals(sparkTask.buildCommand(),
+ "${SPARK_HOME2}/bin/spark-sql " +
+ "--master yarn " +
+ "--deploy-mode client " +
+ "--driver-cores 1 " +
+ "--driver-memory 512M " +
+ "--num-executors 2 " +
+ "--executor-cores 2 " +
+ "--executor-memory 1G " +
+ "--name sparksql " +
+ "-f /tmp/5536_node.sql");
+ }
+
+ private String buildSparkParametersWithSparkSql() {
+ SparkParameters sparkParameters = new SparkParameters();
+ sparkParameters.setLocalParams(Collections.emptyList());
+ sparkParameters.setRawScript("selcet 11111;");
+ sparkParameters.setProgramType(ProgramType.SQL);
+ sparkParameters.setMainClass(StringUtils.EMPTY);
+ sparkParameters.setDeployMode("client");
+ sparkParameters.setAppName("sparksql");
+ sparkParameters.setOthers(StringUtils.EMPTY);
+ sparkParameters.setSparkVersion("SPARK2");
+ sparkParameters.setDriverCores(1);
+ sparkParameters.setDriverMemory("512M");
+ sparkParameters.setNumExecutors(2);
+ sparkParameters.setExecutorMemory("1G");
+ sparkParameters.setExecutorCores(2);
+ return JSONUtils.toJsonString(sparkParameters);
+ }
+
+}
diff --git a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
index 43a532d4e7..71edfa4943 100644
--- a/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
+++ b/dolphinscheduler-ui-next/src/service/modules/resources/types.ts
@@ -66,7 +66,7 @@ interface OnlineCreateReq extends CreateReq, ContentReq {
}
interface ProgramTypeReq {
- programType: 'JAVA' | 'SCALA' | 'PYTHON'
+ programType: 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL'
}
interface ListReq {
diff --git a/dolphinscheduler-ui-next/src/store/project/types.ts b/dolphinscheduler-ui-next/src/store/project/types.ts
index 99a0e67d17..147a84503e 100644
--- a/dolphinscheduler-ui-next/src/store/project/types.ts
+++ b/dolphinscheduler-ui-next/src/store/project/types.ts
@@ -18,7 +18,7 @@
import type { EditWorkflowDefinition } from '@/views/projects/workflow/components/dag/types'
import type { IOption } from '@/components/form/types'
-type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
+type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' | 'SQL'
type DependentResultType = {
[key: string]: 'SUCCESS' | 'WAITING_THREAD' | 'FAILURE'
}
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
index 13f812132d..8f87d1cfb2 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-deploy-mode.ts
@@ -14,34 +14,49 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { useI18n } from 'vue-i18n'
-import type { IJsonItem } from '../types'
+import {ref, watchEffect} from 'vue'
+import {useI18n} from 'vue-i18n'
+import type {IJsonItem, IOption} from '../types'
-export function useDeployMode(span = 24, showClient = true): IJsonItem {
- const { t } = useI18n()
+export function useDeployMode(span = 24, showClient = ref(true), showCluster = ref(true)): IJsonItem {
+ const {t} = useI18n()
- return {
- type: 'radio',
- field: 'deployMode',
- name: t('project.node.deploy_mode'),
- options: DEPLOY_MODES.filter((option) =>
- option.value === 'client' ? showClient : true
- ),
- span
- }
+ const deployModeOptions = ref(DEPLOY_MODES as IOption[])
+
+ watchEffect(
+ () => {
+ deployModeOptions.value = DEPLOY_MODES.filter((option) => {
+ switch (option.value) {
+ case 'cluster':
+ return showCluster.value
+ case 'client':
+ return showClient.value
+ default:
+ return true
+ }
+ })
+ }
+ )
+ return {
+ type: 'radio',
+ field: 'deployMode',
+ name: t('project.node.deploy_mode'),
+ options: deployModeOptions,
+ span
+ }
}
export const DEPLOY_MODES = [
- {
- label: 'cluster',
- value: 'cluster'
- },
- {
- label: 'client',
- value: 'client'
- },
- {
- label: 'local',
- value: 'local'
- }
+ {
+ label: 'cluster',
+ value: 'cluster'
+ },
+ {
+ label: 'client',
+ value: 'client'
+ },
+ {
+ label: 'local',
+ value: 'local'
+ }
]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
index d1e138211f..9b2170ec90 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-flink.ts
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { computed } from 'vue'
+import { computed, ref } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCustomParams, useDeployMode, useMainJar, useResources } from '.'
import type { IJsonItem } from '../types'
@@ -68,7 +68,7 @@ export function useFlink(model: { [field: string]: any }): IJsonItem[] {
}
},
useMainJar(model),
- useDeployMode(24, false),
+ useDeployMode(24, ref(false)),
{
type: 'select',
field: 'flinkVersion',
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
index 41356b07d6..e8008a6a3f 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-main-jar.ts
@@ -14,8 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-import { ref, onMounted, watch } from 'vue'
+import { computed, ref, onMounted, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import { queryResourceByProgramType } from '@/service/modules/resources'
import { useTaskNodeStore } from '@/store/project/task-node'
@@ -27,6 +26,9 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
const mainJarOptions = ref([] as IMainJar[])
const taskStore = useTaskNodeStore()
+ const mainJarSpan = computed(() =>
+ model.programType === 'SQL' ? 0 : 24
+ )
const getMainJars = async (programType: ProgramType) => {
const storeMainJar = taskStore.getMainJar(programType)
if (storeMainJar) {
@@ -57,6 +59,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
type: 'tree-select',
field: 'mainJar',
name: t('project.node.main_package'),
+ span: mainJarSpan,
props: {
cascade: true,
showPath: true,
@@ -67,7 +70,7 @@ export function useMainJar(model: { [field: string]: any }): IJsonItem {
},
validate: {
trigger: ['input', 'blur'],
- required: true,
+ required: model.programType !== 'SQL',
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.main_package_tips'))
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
index ea64355072..76b89b342d 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-mr.ts
@@ -16,7 +16,6 @@
*/
import { computed } from 'vue'
import { useI18n } from 'vue-i18n'
-import { PROGRAM_TYPES } from './use-spark'
import { useCustomParams, useMainJar, useResources } from '.'
import type { IJsonItem } from '../types'
@@ -24,7 +23,7 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const mainClassSpan = computed(() =>
- model.programType === 'PYTHON' ? 0 : 24
+ (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24
)
return [
@@ -91,3 +90,18 @@ export function useMr(model: { [field: string]: any }): IJsonItem[] {
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}
+
+export const PROGRAM_TYPES = [
+ {
+ label: 'JAVA',
+ value: 'JAVA'
+ },
+ {
+ label: 'SCALA',
+ value: 'SCALA'
+ },
+ {
+ label: 'PYTHON',
+ value: 'PYTHON'
+ }
+]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
index fe33b1cd67..b28aa359ac 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/fields/use-spark.ts
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-import { computed } from 'vue'
+import { computed, ref } from 'vue'
import { useI18n } from 'vue-i18n'
import {
useCustomParams,
@@ -32,7 +32,19 @@ import type { IJsonItem } from '../types'
export function useSpark(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const mainClassSpan = computed(() =>
- model.programType === 'PYTHON' ? 0 : 24
+ (model.programType === 'PYTHON' || model.programType === 'SQL') ? 0 : 24
+ )
+
+ const mainArgsSpan = computed(() =>
+ model.programType === 'SQL' ? 0 : 24
+ )
+
+ const rawScriptSpan = computed(() =>
+ model.programType === 'SQL' ? 24 : 0
+ )
+
+ const showCluster = computed(() =>
+ model.programType !== 'SQL'
)
return [
@@ -66,16 +78,27 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
},
validate: {
trigger: ['input', 'blur'],
- required: model.programType !== 'PYTHON',
+ required: model.programType !== 'PYTHON' && model.programType !== 'SQL',
validator(validate: any, value: string) {
- if (model.programType !== 'PYTHON' && !value) {
+ if (model.programType !== 'PYTHON' && !value && model.programType !== 'SQL') {
return new Error(t('project.node.main_class_tips'))
}
}
}
},
useMainJar(model),
- useDeployMode(),
+ {
+ type: 'editor',
+ field: 'rawScript',
+ span: rawScriptSpan,
+ name: t('project.node.script'),
+ validate: {
+ trigger: ['input', 'trigger'],
+ required: true,
+ message: t('project.node.script_tips')
+ }
+ },
+ useDeployMode(24, ref(true), showCluster),
{
type: 'input',
field: 'appName',
@@ -92,6 +115,7 @@ export function useSpark(model: { [field: string]: any }): IJsonItem[] {
{
type: 'input',
field: 'mainArgs',
+ span: mainArgsSpan,
name: t('project.node.main_arguments'),
props: {
type: 'textarea',
@@ -124,6 +148,10 @@ export const PROGRAM_TYPES = [
{
label: 'PYTHON',
value: 'PYTHON'
+ },
+ {
+ label: 'SQL',
+ value: 'SQL'
}
]
diff --git a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
index 520a5481dc..46061ffc9c 100644
--- a/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
+++ b/dolphinscheduler-ui-next/src/views/projects/task/components/node/tasks/use-spark.ts
@@ -45,7 +45,8 @@ export function useSpark({
timeout: 30,
programType: 'SCALA',
sparkVersion: 'SPARK2',
- deployMode: 'cluster',
+ rawScript: '',
+ deployMode: 'local',
driverCores: 1,
driverMemory: '512M',
numExecutors: 2,