You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/09/12 02:35:39 UTC
[dolphinscheduler] branch dev updated: [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 25b78a8003 [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)
25b78a8003 is described below
commit 25b78a80037c4a7edb551dd04328276ebdc7efc1
Author: limaiwang <li...@163.com>
AuthorDate: Mon Sep 12 10:35:27 2022 +0800
[Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)
select different versions of spark-sql to execute different versions of spark-sql
---
.../spark/{SparkVersion.java => SparkCommand.java} | 26 ++++--
.../plugin/task/spark/SparkTask.java | 13 +--
.../plugin/task/spark/SparkVersion.java | 35 +-------
.../plugin/task/spark/SparkTaskTest.java | 97 ++++++++++++++++++++--
dolphinscheduler-worker/src/main/bin/start.sh | 2 +-
5 files changed, 119 insertions(+), 54 deletions(-)
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
similarity index 65%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
index 02c357914b..11609f37f2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
@@ -17,16 +17,20 @@
package org.apache.dolphinscheduler.plugin.task.spark;
-public enum SparkVersion {
+public enum SparkCommand {
/**
- * 0 SPARK1
- * 1 SPARK2
- * 2 SPARKSQL
+ * 0 SPARK1SUBMIT
+ * 1 SPARK2SUBMIT
+ * 2 SPARK1SQL
+ * 3 SPARK2SQL
*/
- SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
- SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
- SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
+ SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1),
+ SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2),
+
+ SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1),
+
+ SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2);
private final int code;
private final String descp;
@@ -34,11 +38,13 @@ public enum SparkVersion {
* usage: spark-submit [options] <app jar | python file> [app arguments]
*/
private final String command;
+ private final SparkVersion sparkVersion;
- SparkVersion(int code, String descp, String command) {
+ SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) {
this.code = code;
this.descp = descp;
this.command = command;
+ this.sparkVersion = sparkVersion;
}
public int getCode() {
@@ -52,4 +58,8 @@ public enum SparkVersion {
public String getCommand() {
return command;
}
+
+ public SparkVersion getSparkVersion() {
+ return sparkVersion;
+ }
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 01db342661..2e40ecf696 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
@@ -42,7 +41,6 @@ import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -100,16 +98,19 @@ public class SparkTask extends AbstractYarnTask {
List<String> args = new ArrayList<>();
// spark version
- String sparkCommand = SparkVersion.SPARK2.getCommand();
+ String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand();
// If the programType is non-SQL, execute bin/spark-submit
- if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
- sparkCommand = SparkVersion.SPARK1.getCommand();
+ if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
+ sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand();
}
// If the programType is SQL, execute bin/spark-sql
if (sparkParameters.getProgramType() == ProgramType.SQL) {
- sparkCommand = SparkVersion.SPARKSQL.getCommand();
+ sparkCommand = SparkCommand.SPARK2SQL.getCommand();
+ if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
+ sparkCommand = SparkCommand.SPARK1SQL.getCommand();
+ }
}
args.add(sparkCommand);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
index 02c357914b..baafafe2e4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
@@ -18,38 +18,5 @@
package org.apache.dolphinscheduler.plugin.task.spark;
public enum SparkVersion {
-
- /**
- * 0 SPARK1
- * 1 SPARK2
- * 2 SPARKSQL
- */
- SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
- SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
- SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
-
- private final int code;
- private final String descp;
- /**
- * usage: spark-submit [options] <app jar | python file> [app arguments]
- */
- private final String command;
-
- SparkVersion(int code, String descp, String command) {
- this.code = code;
- this.descp = descp;
- this.command = command;
- }
-
- public int getCode() {
- return code;
- }
-
- public String getDescp() {
- return descp;
- }
-
- public String getCommand() {
- return command;
- }
+ SPARK1, SPARK2
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 9d5565ab95..4edd28fd1e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.spark;
import java.util.Collections;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.junit.Assert;
@@ -42,8 +43,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
public class SparkTaskTest {
@Test
- public void testBuildCommandWithSparkSql() throws Exception {
- String parameters = buildSparkParametersWithSparkSql();
+ public void testBuildCommandWithSpark2Sql() throws Exception {
+ String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2");
TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@@ -63,22 +64,108 @@ public class SparkTaskTest {
"--name sparksql " +
"-f /tmp/5536_node.sql");
}
+ @Test
+ public void testBuildCommandWithSpark1Sql() throws Exception {
+ String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1");
+ TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+ when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+ SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+ sparkTask.init();
+ Assert.assertEquals(sparkTask.buildCommand(),
+ "${SPARK_HOME1}/bin/spark-sql " +
+ "--master yarn " +
+ "--deploy-mode client " +
+ "--driver-cores 1 " +
+ "--driver-memory 512M " +
+ "--num-executors 2 " +
+ "--executor-cores 2 " +
+ "--executor-memory 1G " +
+ "--name sparksql " +
+ "-f /tmp/5536_node.sql");
+ }
- private String buildSparkParametersWithSparkSql() {
+ @Test
+ public void testBuildCommandWithSpark2Submit() throws Exception {
+ String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2");
+ TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+ when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+ SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+ sparkTask.init();
+ Assert.assertEquals(sparkTask.buildCommand(),
+ "${SPARK_HOME2}/bin/spark-submit " +
+ "--master yarn " +
+ "--deploy-mode client " +
+ "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
+ "--driver-cores 1 " +
+ "--driver-memory 512M " +
+ "--num-executors 2 " +
+ "--executor-cores 2 " +
+ "--executor-memory 1G " +
+ "--name spark " +
+ "lib/dolphinscheduler-task-spark.jar");
+ }
+ @Test
+ public void testBuildCommandWithSpark1Submit() throws Exception {
+ String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1");
+ TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+ when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+ when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+ when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+ SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+ sparkTask.init();
+ Assert.assertEquals(sparkTask.buildCommand(),
+ "${SPARK_HOME1}/bin/spark-submit " +
+ "--master yarn " +
+ "--deploy-mode client " +
+ "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
+ "--driver-cores 1 " +
+ "--driver-memory 512M " +
+ "--num-executors 2 " +
+ "--executor-cores 2 " +
+ "--executor-memory 1G " +
+ "--name spark " +
+ "lib/dolphinscheduler-task-spark.jar");
+ }
+ private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) {
SparkParameters sparkParameters = new SparkParameters();
sparkParameters.setLocalParams(Collections.emptyList());
sparkParameters.setRawScript("selcet 11111;");
- sparkParameters.setProgramType(ProgramType.SQL);
+ sparkParameters.setProgramType(programType);
sparkParameters.setMainClass("");
sparkParameters.setDeployMode("client");
sparkParameters.setAppName("sparksql");
sparkParameters.setOthers("");
- sparkParameters.setSparkVersion("SPARK2");
+ sparkParameters.setSparkVersion(sparkVersion);
+ sparkParameters.setDriverCores(1);
+ sparkParameters.setDriverMemory("512M");
+ sparkParameters.setNumExecutors(2);
+ sparkParameters.setExecutorMemory("1G");
+ sparkParameters.setExecutorCores(2);
+ return JSONUtils.toJsonString(sparkParameters);
+ }
+ private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) {
+ SparkParameters sparkParameters = new SparkParameters();
+ sparkParameters.setLocalParams(Collections.emptyList());
+ sparkParameters.setProgramType(programType);
+ sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
+ sparkParameters.setDeployMode("client");
+ sparkParameters.setAppName("spark");
+ sparkParameters.setOthers("");
+ sparkParameters.setSparkVersion(sparkVersion);
sparkParameters.setDriverCores(1);
sparkParameters.setDriverMemory("512M");
sparkParameters.setNumExecutors(2);
sparkParameters.setExecutorMemory("1G");
sparkParameters.setExecutorCores(2);
+ ResourceInfo resourceInfo = new ResourceInfo();
+ resourceInfo.setId(1);
+ resourceInfo.setRes("dolphinscheduler-task-spark.jar");
+ resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
+ sparkParameters.setMainJar(resourceInfo);
return JSONUtils.toJsonString(sparkParameters);
}
diff --git a/dolphinscheduler-worker/src/main/bin/start.sh b/dolphinscheduler-worker/src/main/bin/start.sh
index 1a865ca6bc..56a799b500 100644
--- a/dolphinscheduler-worker/src/main/bin/start.sh
+++ b/dolphinscheduler-worker/src/main/bin/start.sh
@@ -21,7 +21,7 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
-chmod -R 700 ${DOLPHINSCHEDULER_HOME}/config
+chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf
export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}