You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/09/12 02:35:39 UTC

[dolphinscheduler] branch dev updated: [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 25b78a8003 [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)
25b78a8003 is described below

commit 25b78a80037c4a7edb551dd04328276ebdc7efc1
Author: limaiwang <li...@163.com>
AuthorDate: Mon Sep 12 10:35:27 2022 +0800

    [Bug] [spark-sql] In spark-sql, select both SPARK1 and SPARK2 versions and execute ${SPARK_HOME2}/bin/spark-sql (#11721)
    
    select different versions of spark-sql to execute different versions of spark-sql
---
 .../spark/{SparkVersion.java => SparkCommand.java} | 26 ++++--
 .../plugin/task/spark/SparkTask.java               | 13 +--
 .../plugin/task/spark/SparkVersion.java            | 35 +-------
 .../plugin/task/spark/SparkTaskTest.java           | 97 ++++++++++++++++++++--
 dolphinscheduler-worker/src/main/bin/start.sh      |  2 +-
 5 files changed, 119 insertions(+), 54 deletions(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
similarity index 65%
copy from dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
copy to dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
index 02c357914b..11609f37f2 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkCommand.java
@@ -17,16 +17,20 @@
 
 package org.apache.dolphinscheduler.plugin.task.spark;
 
-public enum SparkVersion {
+public enum SparkCommand {
 
     /**
-     * 0 SPARK1
-     * 1 SPARK2
-     * 2 SPARKSQL
+     * 0 SPARK1SUBMIT
+     * 1 SPARK2SUBMIT
+     * 2 SPARK1SQL
+     * 3 SPARK2SQL
      */
-    SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
-    SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
-    SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
+    SPARK1SUBMIT(0, "SPARK1SUBMIT", "${SPARK_HOME1}/bin/spark-submit", SparkVersion.SPARK1),
+    SPARK2SUBMIT(1, "SPARK2SUBMIT", "${SPARK_HOME2}/bin/spark-submit", SparkVersion.SPARK2),
+
+    SPARK1SQL(2, "SPARK1SQL", "${SPARK_HOME1}/bin/spark-sql", SparkVersion.SPARK1),
+
+    SPARK2SQL(3, "SPARK2SQL", "${SPARK_HOME2}/bin/spark-sql", SparkVersion.SPARK2);
 
     private final int code;
     private final String descp;
@@ -34,11 +38,13 @@ public enum SparkVersion {
      * usage: spark-submit [options] <app jar | python file> [app arguments]
      */
     private final String command;
+    private final SparkVersion sparkVersion;
 
-    SparkVersion(int code, String descp, String command) {
+    SparkCommand(int code, String descp, String command, SparkVersion sparkVersion) {
         this.code = code;
         this.descp = descp;
         this.command = command;
+        this.sparkVersion = sparkVersion;
     }
 
     public int getCode() {
@@ -52,4 +58,8 @@ public enum SparkVersion {
     public String getCommand() {
         return command;
     }
+
+    public SparkVersion getSparkVersion() {
+        return sparkVersion;
+    }
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 01db342661..2e40ecf696 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
 import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
 import org.apache.dolphinscheduler.plugin.task.api.utils.ArgsUtils;
-import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 import org.apache.dolphinscheduler.spi.utils.StringUtils;
 
@@ -42,7 +41,6 @@ import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -100,16 +98,19 @@ public class SparkTask extends AbstractYarnTask {
         List<String> args = new ArrayList<>();
 
         // spark version
-        String sparkCommand = SparkVersion.SPARK2.getCommand();
+        String sparkCommand = SparkCommand.SPARK2SUBMIT.getCommand();
 
         // If the programType is non-SQL, execute bin/spark-submit
-        if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) {
-            sparkCommand = SparkVersion.SPARK1.getCommand();
+        if (SparkCommand.SPARK1SUBMIT.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
+            sparkCommand = SparkCommand.SPARK1SUBMIT.getCommand();
         }
 
         // If the programType is SQL, execute bin/spark-sql
         if (sparkParameters.getProgramType() == ProgramType.SQL) {
-            sparkCommand = SparkVersion.SPARKSQL.getCommand();
+            sparkCommand = SparkCommand.SPARK2SQL.getCommand();
+            if (SparkCommand.SPARK1SQL.getSparkVersion().name().equals(sparkParameters.getSparkVersion())) {
+                sparkCommand = SparkCommand.SPARK1SQL.getCommand();
+            }
         }
 
         args.add(sparkCommand);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
index 02c357914b..baafafe2e4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkVersion.java
@@ -18,38 +18,5 @@
 package org.apache.dolphinscheduler.plugin.task.spark;
 
 public enum SparkVersion {
-
-    /**
-     * 0 SPARK1
-     * 1 SPARK2
-     * 2 SPARKSQL
-     */
-    SPARK1(0, "SPARK1", "${SPARK_HOME1}/bin/spark-submit"),
-    SPARK2(1, "SPARK2", "${SPARK_HOME2}/bin/spark-submit"),
-    SPARKSQL(2, "SPARKSQL", "${SPARK_HOME2}/bin/spark-sql");
-
-    private final int code;
-    private final String descp;
-    /**
-     * usage: spark-submit [options] <app jar | python file> [app arguments]
-     */
-    private final String command;
-
-    SparkVersion(int code, String descp, String command) {
-        this.code = code;
-        this.descp = descp;
-        this.command = command;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public String getDescp() {
-        return descp;
-    }
-
-    public String getCommand() {
-        return command;
-    }
+    SPARK1, SPARK2
 }
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
index 9d5565ab95..4edd28fd1e 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.spark;
 import java.util.Collections;
 
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
 import org.junit.Assert;
@@ -42,8 +43,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
 public class SparkTaskTest {
 
     @Test
-    public void testBuildCommandWithSparkSql() throws Exception {
-        String parameters = buildSparkParametersWithSparkSql();
+    public void testBuildCommandWithSpark2Sql() throws Exception {
+        String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK2");
         TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
         when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
         when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
@@ -63,22 +64,108 @@ public class SparkTaskTest {
                 "--name sparksql " +
                 "-f /tmp/5536_node.sql");
     }
+    @Test
+    public void testBuildCommandWithSpark1Sql() throws Exception {
+        String parameters = buildSparkParametersWithSparkSql(ProgramType.SQL, "SPARK1");
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+        SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+        sparkTask.init();
+        Assert.assertEquals(sparkTask.buildCommand(),
+                "${SPARK_HOME1}/bin/spark-sql " +
+                        "--master yarn " +
+                        "--deploy-mode client " +
+                        "--driver-cores 1 " +
+                        "--driver-memory 512M " +
+                        "--num-executors 2 " +
+                        "--executor-cores 2 " +
+                        "--executor-memory 1G " +
+                        "--name sparksql " +
+                        "-f /tmp/5536_node.sql");
+    }
 
-    private String buildSparkParametersWithSparkSql() {
+    @Test
+    public void testBuildCommandWithSpark2Submit() throws Exception {
+        String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK2");
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+        SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+        sparkTask.init();
+        Assert.assertEquals(sparkTask.buildCommand(),
+                "${SPARK_HOME2}/bin/spark-submit " +
+                        "--master yarn " +
+                        "--deploy-mode client " +
+                        "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
+                        "--driver-cores 1 " +
+                        "--driver-memory 512M " +
+                        "--num-executors 2 " +
+                        "--executor-cores 2 " +
+                        "--executor-memory 1G " +
+                        "--name spark " +
+                        "lib/dolphinscheduler-task-spark.jar");
+    }
+    @Test
+    public void testBuildCommandWithSpark1Submit() throws Exception {
+        String parameters = buildSparkParametersWithSparkSubmit(ProgramType.SCALA, "SPARK1");
+        TaskExecutionContext taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
+        when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
+        when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
+        when(taskExecutionContext.getTaskAppId()).thenReturn("5536");
+        SparkTask sparkTask = spy(new SparkTask(taskExecutionContext));
+        sparkTask.init();
+        Assert.assertEquals(sparkTask.buildCommand(),
+                "${SPARK_HOME1}/bin/spark-submit " +
+                        "--master yarn " +
+                        "--deploy-mode client " +
+                        "--class org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest " +
+                        "--driver-cores 1 " +
+                        "--driver-memory 512M " +
+                        "--num-executors 2 " +
+                        "--executor-cores 2 " +
+                        "--executor-memory 1G " +
+                        "--name spark " +
+                        "lib/dolphinscheduler-task-spark.jar");
+    }
+    private String buildSparkParametersWithSparkSql(ProgramType programType, String sparkVersion) {
         SparkParameters sparkParameters = new SparkParameters();
         sparkParameters.setLocalParams(Collections.emptyList());
         sparkParameters.setRawScript("selcet 11111;");
-        sparkParameters.setProgramType(ProgramType.SQL);
+        sparkParameters.setProgramType(programType);
         sparkParameters.setMainClass("");
         sparkParameters.setDeployMode("client");
         sparkParameters.setAppName("sparksql");
         sparkParameters.setOthers("");
-        sparkParameters.setSparkVersion("SPARK2");
+        sparkParameters.setSparkVersion(sparkVersion);
+        sparkParameters.setDriverCores(1);
+        sparkParameters.setDriverMemory("512M");
+        sparkParameters.setNumExecutors(2);
+        sparkParameters.setExecutorMemory("1G");
+        sparkParameters.setExecutorCores(2);
+        return JSONUtils.toJsonString(sparkParameters);
+    }
+    private String buildSparkParametersWithSparkSubmit(ProgramType programType, String sparkVersion) {
+        SparkParameters sparkParameters = new SparkParameters();
+        sparkParameters.setLocalParams(Collections.emptyList());
+        sparkParameters.setProgramType(programType);
+        sparkParameters.setMainClass("org.apache.dolphinscheduler.plugin.task.spark.SparkTaskTest");
+        sparkParameters.setDeployMode("client");
+        sparkParameters.setAppName("spark");
+        sparkParameters.setOthers("");
+        sparkParameters.setSparkVersion(sparkVersion);
         sparkParameters.setDriverCores(1);
         sparkParameters.setDriverMemory("512M");
         sparkParameters.setNumExecutors(2);
         sparkParameters.setExecutorMemory("1G");
         sparkParameters.setExecutorCores(2);
+        ResourceInfo resourceInfo = new ResourceInfo();
+        resourceInfo.setId(1);
+        resourceInfo.setRes("dolphinscheduler-task-spark.jar");
+        resourceInfo.setResourceName("/lib/dolphinscheduler-task-spark.jar");
+        sparkParameters.setMainJar(resourceInfo);
         return JSONUtils.toJsonString(sparkParameters);
     }
 
diff --git a/dolphinscheduler-worker/src/main/bin/start.sh b/dolphinscheduler-worker/src/main/bin/start.sh
index 1a865ca6bc..56a799b500 100644
--- a/dolphinscheduler-worker/src/main/bin/start.sh
+++ b/dolphinscheduler-worker/src/main/bin/start.sh
@@ -21,7 +21,7 @@ DOLPHINSCHEDULER_HOME=${DOLPHINSCHEDULER_HOME:-$(cd $BIN_DIR/..; pwd)}
 
 source "$DOLPHINSCHEDULER_HOME/conf/dolphinscheduler_env.sh"
 
-chmod -R 700 ${DOLPHINSCHEDULER_HOME}/config
+chmod -R 700 ${DOLPHINSCHEDULER_HOME}/conf
 export DOLPHINSCHEDULER_WORK_HOME=${DOLPHINSCHEDULER_HOME}
 
 JAVA_OPTS=${JAVA_OPTS:-"-server -Duser.timezone=${SPRING_JACKSON_TIME_ZONE} -Xms4g -Xmx4g -Xmn2g -XX:+PrintGCDetails -Xloggc:gc.log -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=dump.hprof"}