You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/06/06 13:12:51 UTC

[incubator-dolphinscheduler] branch dev-1.3.0 updated: Fixbug datax task (#2909)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
     new 7c9f596  Fixbug datax task (#2909)
7c9f596 is described below

commit 7c9f5960e5a0c2b446a5575d1703c8f644c10dce
Author: Rubik-W <39...@users.noreply.github.com>
AuthorDate: Sat Jun 6 21:12:41 2020 +0800

    Fixbug datax task (#2909)
    
    * fix:  local param bug
    
    * fix: UT bug
    
    Co-authored-by: Rubik-W <wh...@163.com>
---
 .../common/task/datax/DataxParameters.java         | 11 +++--
 .../common/utils/ParameterUtils.java               |  2 +-
 .../server/worker/task/datax/DataxTask.java        | 49 ++++++++--------------
 .../server/worker/task/datax/DataxTaskTest.java    | 11 ++---
 .../pages/dag/_source/formModel/tasks/datax.vue    | 11 +++++
 5 files changed, 40 insertions(+), 44 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
index 872b3aa..f54e107 100755
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
@@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters {
     /**
      * if custom json configļ¼Œeg  0, 1
      */
-    private Integer customConfig;
+    private int customConfig;
 
     /**
      * if customConfig eq 1 ,then json is usable
@@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters {
      */
     private int jobSpeedRecord;
 
-    public Integer getCustomConfig() {
+    public int getCustomConfig() {
         return customConfig;
     }
 
-    public void setCustomConfig(Integer customConfig) {
+    public void setCustomConfig(int customConfig) {
         this.customConfig = customConfig;
     }
 
@@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters {
         this.jobSpeedRecord = jobSpeedRecord;
     }
 
-
     @Override
     public boolean checkParameters() {
-        if (customConfig == null) return false;
-        if (customConfig == 0) {
+        if (customConfig == Flag.NO.ordinal()) {
             return dataSource != 0
                     && dataTarget != 0
                     && StringUtils.isNotEmpty(sql)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
index bbc8955..84c60db 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
@@ -48,7 +48,7 @@ public class ParameterUtils {
    * @return convert parameters place holders
    */
   public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) {
-    if (StringUtils.isEmpty(parameterString)) {
+    if (StringUtils.isEmpty(parameterString) || parameterMap == null) {
       return parameterString;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 9589a30..f636133 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
@@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask {
             String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
             Thread.currentThread().setName(threadLoggerInfoName);
 
+            // combining local and global parameters
+            Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
+                    taskExecutionContext.getDefinedParams(),
+                    dataXParameters.getLocalParametersMap(),
+                    CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+                    taskExecutionContext.getScheduleTime());
+
             // run datax process
-            String jsonFilePath = buildDataxJsonFile();
-            String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
+            String jsonFilePath = buildDataxJsonFile(paramsMap);
+            String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
             CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
 
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
@@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask {
      * @return datax json file name
      * @throws Exception if error throws Exception
      */
-    private String buildDataxJsonFile()
+    private String buildDataxJsonFile(Map<String, Property> paramsMap)
         throws Exception {
         // generate json
         String fileName = String.format("%s/%s_job.json",
@@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask {
             return fileName;
         }
 
-
-
-        if (dataXParameters.getCustomConfig() == 1){
-
+        if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){
             json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
-
-            /**
-             *  combining local and global parameters
-             */
-            Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-                    taskExecutionContext.getDefinedParams(),
-                    dataXParameters.getLocalParametersMap(),
-                    CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-                    taskExecutionContext.getScheduleTime());
-            if (paramsMap != null){
-                json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
-            }
-
         }else {
-
             JSONObject job = new JSONObject();
             job.put("content", buildDataxJobContentJson());
             job.put("setting", buildDataxJobSettingJson());
@@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask {
             json = root.toString();
         }
 
+        // replace placeholder
+        json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
+
         logger.debug("datax job json : {}", json);
 
         // create datax json file
@@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask {
      * @return shell command file name
      * @throws Exception if error throws Exception
      */
-    private String buildShellCommandFile(String jobConfigFilePath)
+    private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
         throws Exception {
         // generate scripts
         String fileName = String.format("%s/%s_node.%s",
@@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask {
         sbr.append(DATAX_HOME_EVN);
         sbr.append(" ");
         sbr.append(jobConfigFilePath);
-        String dataxCommand = sbr.toString();
 
-        // combining local and global parameters
         // replace placeholder
-        Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-                taskExecutionContext.getDefinedParams(),
-                dataXParameters.getLocalParametersMap(),
-                CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-                taskExecutionContext.getScheduleTime());
-        if (paramsMap != null) {
-            dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap));
-        }
+        String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap));
 
         logger.debug("raw script : {}", dataxCommand);
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index a2a46ef..041f81d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -21,9 +21,9 @@ import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
@@ -273,14 +273,15 @@ public class DataxTaskTest {
             setTaskParems(0);
             buildDataJson();
         } catch (Exception e) {
+            e.printStackTrace();
             Assert.fail(e.getMessage());
         }
     }
 
     public void buildDataJson() throws Exception {
-        Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
+        Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class});
         method.setAccessible(true);
-        String filePath = (String) method.invoke(dataxTask, null);
+        String filePath = (String) method.invoke(dataxTask, new Object[]{null});
         Assert.assertNotNull(filePath);
     }
 
@@ -358,9 +359,9 @@ public class DataxTaskTest {
     public void testBuildShellCommandFile()
         throws Exception {
         try {
-            Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class);
+            Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class);
             method.setAccessible(true);
-            Assert.assertNotNull(method.invoke(dataxTask, "test.json"));
+            Assert.assertNotNull(method.invoke(dataxTask, "test.json", null));
         }
         catch (Exception e) {
             Assert.fail(e.getMessage());
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
index f1c9b75..9347b34 100755
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
@@ -237,6 +237,12 @@
         this.postStatements = a
       },
       /**
+       * return localParams
+       */
+      _onLocalParams (a) {
+        this.localParams = a
+      },
+      /**
        * verification
        */
       _verification () {
@@ -246,6 +252,11 @@
             return false
           }
 
+          // localParams Subcomponent verification
+          if (!this.$refs.refLocalParams._verifProp()) {
+            return false
+          }
+
           // storage
           this.$emit('on-params', {
             customConfig: this.customConfig,