You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/08/20 13:40:18 UTC

[dolphinscheduler] branch 1.3.8-prepare updated: [FIX-#6007]Wrong complement date (#6009)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 1.3.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.8-prepare by this push:
     new d6fc749  [FIX-#6007]Wrong complement date (#6009)
d6fc749 is described below

commit d6fc749ca39d43abc340b82562060ed8885dd761
Author: Kirs <ac...@163.com>
AuthorDate: Fri Aug 20 21:39:58 2021 +0800

    [FIX-#6007]Wrong complement date (#6009)
---
 .../server/entity/TaskExecutionContext.java        | 14 ++++++++++++
 .../server/worker/runner/TaskExecuteThread.java    | 26 +++++++++++++++++++++-
 .../server/worker/task/datax/DataxTask.java        |  9 +++++++-
 .../server/worker/task/flink/FlinkTask.java        |  9 ++++++++
 .../server/worker/task/http/HttpTask.java          |  9 ++++++++
 .../server/worker/task/mr/MapReduceTask.java       |  9 ++++++++
 .../server/worker/task/python/PythonTask.java      | 10 +++++++++
 .../server/worker/task/shell/ShellTask.java        | 22 +++++++-----------
 .../server/worker/task/sql/SqlTask.java            | 10 +++++++--
 .../server/worker/task/sqoop/SqoopTask.java        | 10 ++++++++-
 10 files changed, 109 insertions(+), 19 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 41331ab..e6df0f6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.entity;
 
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
@@ -192,6 +193,19 @@ public class TaskExecutionContext implements Serializable{
     private SqoopTaskExecutionContext sqoopTaskExecutionContext;
 
     /**
+     * business param
+     */
+    private Map<String, Property> paramsMap;
+
+    public Map<String, Property> getParamsMap() {
+        return paramsMap;
+    }
+
+    public void setParamsMap(Map<String, Property> paramsMap) {
+        this.paramsMap = paramsMap;
+    }
+
+    /**
      *  procedure TaskExecutionContext
      */
     private ProcedureTaskExecutionContext procedureTaskExecutionContext;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 81f14e7..77a775d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,14 +17,20 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import static java.util.Calendar.DAY_OF_MONTH;
+
 import com.alibaba.fastjson.JSONObject;
 import org.apache.commons.collections.MapUtils;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -128,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
 
             // task init
             task.init();
-
+            preBuildBusinessParams();
             // task handle
             task.handle();
 
@@ -154,6 +160,24 @@ public class TaskExecuteThread implements Runnable {
         }
     }
 
+
+    private void preBuildBusinessParams(){
+        Map<String, Property> paramsMap = new HashMap<>();
+        // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+        if (taskExecutionContext.getScheduleTime() != null) {
+            Date date = taskExecutionContext.getScheduleTime();
+            if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
+                date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
+            }
+            String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
+            Property p = new Property();
+            p.setValue(dateTime);
+            p.setProp(Constants.PARAMETER_DATETIME);
+            paramsMap.put(Constants.PARAMETER_DATETIME, p);
+        }
+        taskExecutionContext.setParamsMap(paramsMap);
+    }
+
     /**
      * when task finish, clear execute path.
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index afb9115..0522187 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
 
 import java.io.File;
@@ -57,6 +58,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -157,7 +159,12 @@ public class DataxTask extends AbstractTask {
                     dataXParameters.getLocalParametersMap(),
                     CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                     taskExecutionContext.getScheduleTime());
-
+            if(MapUtils.isEmpty(paramsMap)){
+                paramsMap=new HashMap<>();
+            }
+            if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+                paramsMap.putAll(taskExecutionContext.getParamsMap());
+            }
             // run datax process
             String jsonFilePath = buildDataxJsonFile(paramsMap);
             String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 4d34190..450964d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -86,6 +89,12 @@ public class FlinkTask extends AbstractYarnTask {
                     flinkParameters.getLocalParametersMap(),
                     CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                     taskExecutionContext.getScheduleTime());
+            if(MapUtils.isEmpty(paramsMap)){
+                paramsMap=new HashMap<>();
+            }
+            if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+                paramsMap.putAll(taskExecutionContext.getParamsMap());
+            }
 
             logger.info("param Map : {}", paramsMap);
             if (paramsMap != null) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 8dc5659..acc3eab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task.http;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.Charsets;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -148,6 +151,12 @@ public class HttpTask extends AbstractTask {
                 httpParameters.getLocalParametersMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
+        if(MapUtils.isEmpty(paramsMap)){
+            paramsMap=new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
         List<HttpProperty> httpPropertyList = new ArrayList<>();
         if(CollectionUtils.isNotEmpty(httpParameters.getHttpParams() )){
             for (HttpProperty httpProperty: httpParameters.getHttpParams()) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index f60b1cb..4988794 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -32,7 +32,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -90,6 +93,12 @@ public class MapReduceTask extends AbstractYarnTask {
                 mapreduceParameters.getLocalParametersMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
+        if(MapUtils.isEmpty(paramsMap)){
+            paramsMap=new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
 
         if (paramsMap != null) {
             String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),  ParamUtils.convert(paramsMap));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 7a66227..f8df005 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -29,8 +29,12 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
+
+import org.apache.commons.collections.MapUtils;
+
 import org.slf4j.Logger;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -120,6 +124,12 @@ public class PythonTask extends AbstractTask {
             pythonParameters.getLocalParametersMap(),
             CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
             taskExecutionContext.getScheduleTime());
+    if(MapUtils.isEmpty(paramsMap)){
+      paramsMap=new HashMap<>();
+    }
+    if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+      paramsMap.putAll(taskExecutionContext.getParamsMap());
+    }
     if (paramsMap != null){
       rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index a331765..cca2a04 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -32,6 +32,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+
+import org.apache.commons.collections.MapUtils;
+
 import org.slf4j.Logger;
 
 import java.io.File;
@@ -138,20 +141,11 @@ public class ShellTask extends AbstractTask {
         shellParameters.getLocalParametersMap(),
         CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
         taskExecutionContext.getScheduleTime());
-    // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
-    if (taskExecutionContext.getScheduleTime() != null) {
-      if (paramsMap == null) {
-        paramsMap = new HashMap<>();
-      }
-      Date date = taskExecutionContext.getScheduleTime();
-      if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
-        date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
-      }
-      String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
-      Property p = new Property();
-      p.setValue(dateTime);
-      p.setProp(Constants.PARAMETER_DATETIME);
-      paramsMap.put(Constants.PARAMETER_DATETIME, p);
+    if(MapUtils.isEmpty(paramsMap)){
+      paramsMap=new HashMap<>();
+    }
+    if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+      paramsMap.putAll(taskExecutionContext.getParamsMap());
     }
     script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index af1a5c5..f021cb7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.sql.Connection;
@@ -181,9 +182,14 @@ public class SqlTask extends AbstractTask {
                 sqlParameters.getLocalParametersMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
-
+        if(MapUtils.isEmpty(paramsMap)){
+            paramsMap=new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
         // spell SQL according to the final user-defined variable
-        if(paramsMap == null){
+        if(paramsMap.isEmpty()){
             sqlBuilder.append(sql);
             return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 00d94f0..da5898d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -28,6 +28,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
 
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -78,7 +81,12 @@ public class SqoopTask extends AbstractYarnTask {
             sqoopParameters.getLocalParametersMap(),
             CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
             sqoopTaskExecutionContext.getScheduleTime());
-
+        if(MapUtils.isEmpty(paramsMap)){
+            paramsMap=new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())){
+            paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+        }
         if (paramsMap != null) {
             String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
             logger.info("sqoop script: {}", resultScripts);