You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/24 07:06:32 UTC

[dolphinscheduler] branch dev updated: [FIX-#6007]Wrong complement date (#6026)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b85793  [FIX-#6007]Wrong complement date (#6026)
7b85793 is described below

commit 7b8579310f5b11c3e8195b85873eaa920d11bc58
Author: linquan <11...@qq.com>
AuthorDate: Tue Aug 24 15:06:28 2021 +0800

    [FIX-#6007]Wrong complement date (#6026)
    
    * [FIX-#6007]Wrong complement date
    
    * [style]Wrong complement date
---
 .../server/entity/TaskExecutionContext.java        | 14 +++++++++++
 .../server/worker/runner/TaskExecuteThread.java    | 23 +++++++++++++++++-
 .../server/worker/task/datax/DataxTask.java        |  8 +++++++
 .../server/worker/task/flink/FlinkTask.java        | 15 ++++++++----
 .../server/worker/task/http/HttpTask.java          |  8 +++++++
 .../server/worker/task/mr/MapReduceTask.java       | 21 +++++++++++------
 .../server/worker/task/python/PythonTask.java      | 15 ++++++++----
 .../server/worker/task/shell/ShellTask.java        | 27 ++++++----------------
 .../server/worker/task/spark/SparkTask.java        | 15 ++++++++----
 .../server/worker/task/sql/SqlTask.java            |  8 ++++++-
 .../server/worker/task/sqoop/SqoopTask.java        | 11 ++++++++-
 11 files changed, 122 insertions(+), 43 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 7a47107..f50b638 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.entity;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
@@ -222,6 +223,19 @@ public class TaskExecutionContext implements Serializable {
     private String varPool;
 
     /**
+     * business param
+     */
+    private Map<String, Property> paramsMap;
+
+    public Map<String, Property> getParamsMap() {
+        return paramsMap;
+    }
+
+    public void setParamsMap(Map<String, Property> paramsMap) {
+        this.paramsMap = paramsMap;
+    }
+
+    /**
      * procedure TaskExecutionContext
      */
     private ProcedureTaskExecutionContext procedureTaskExecutionContext;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 50847f7..73a6638 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,6 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
+import static java.util.Calendar.DAY_OF_MONTH;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -153,6 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
             task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
             // task init
             task.init();
+            preBuildBusinessParams();
             //init varPool
             task.getParameters().setVarPool(taskExecutionContext.getVarPool());
             // task handle
@@ -182,6 +187,23 @@ public class TaskExecuteThread implements Runnable, Delayed {
         }
     }
 
+    private void preBuildBusinessParams() {
+        Map<String, Property> paramsMap = new HashMap<>();
+        // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+        if (taskExecutionContext.getScheduleTime() != null) {
+            Date date = taskExecutionContext.getScheduleTime();
+            if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
+                date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
+            }
+            String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
+            Property p = new Property();
+            p.setValue(dateTime);
+            p.setProp(Constants.PARAMETER_DATETIME);
+            paramsMap.put(Constants.PARAMETER_DATETIME, p);
+        }
+        taskExecutionContext.setParamsMap(paramsMap);
+    }
+
     /**
      * when task finish, clear execute path.
      */
@@ -227,7 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
         return globalParamsMap;
     }
 
-
     /**
      * kill task
      */
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index c30326d..aa4e2ce 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
 
 import java.io.File;
@@ -56,6 +57,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -155,6 +157,12 @@ public class DataxTask extends AbstractTask {
 
             // replace placeholder,and combine local and global parameters
             Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+            if (MapUtils.isEmpty(paramsMap)) {
+                paramsMap = new HashMap<>();
+            }
+            if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+                paramsMap.putAll(taskExecutionContext.getParamsMap());
+            }
 
             // run datax procesDataSourceService.s
             String jsonFilePath = buildDataxJsonFile(paramsMap);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 863b91a..928edc5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -81,12 +84,16 @@ public class FlinkTask extends AbstractYarnTask {
 
             // combining local and global parameters
             Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+            if (MapUtils.isEmpty(paramsMap)) {
+                paramsMap = new HashMap<>();
+            }
+            if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+                paramsMap.putAll(taskExecutionContext.getParamsMap());
+            }
 
             logger.info("param Map : {}", paramsMap);
-            if (paramsMap != null) {
-                args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
-                logger.info("param args : {}", args);
-            }
+            args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
+            logger.info("param args : {}", args);
             flinkParameters.setMainArgs(args);
         }
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 4e34741..2c9ccc4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.Charsets;
 import org.apache.http.HttpEntity;
 import org.apache.http.ParseException;
@@ -49,6 +50,7 @@ import org.apache.http.util.EntityUtils;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -138,6 +140,12 @@ public class HttpTask extends AbstractTask {
 
         // replace placeholder,and combine local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
 
         List<HttpProperty> httpPropertyList = new ArrayList<>();
         if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index 5e8f3ca..c7fdba4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -85,14 +88,18 @@ public class MapReduceTask extends AbstractYarnTask {
 
         // replace placeholder,and combine local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
 
-        if (paramsMap != null) {
-            String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),  ParamUtils.convert(paramsMap));
-            mapreduceParameters.setMainArgs(args);
-            if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
-                String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(),  ParamUtils.convert(paramsMap));
-                mapreduceParameters.setOthers(others);
-            }
+        String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(),  ParamUtils.convert(paramsMap));
+        mapreduceParameters.setMainArgs(args);
+        if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
+            String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(),  ParamUtils.convert(paramsMap));
+            mapreduceParameters.setOthers(others);
         }
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 0ee480d..5ffa6ca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
 
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -118,6 +121,12 @@ public class PythonTask extends AbstractTask {
 
         // combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
         
         try {
             rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
@@ -125,10 +134,8 @@ public class PythonTask extends AbstractTask {
         catch (StringIndexOutOfBoundsException e) {
             logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
         }
-        
-        if (paramsMap != null) {
-            rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
-        }
+
+        rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
 
         logger.info("raw python script : {}", pythonParameters.getRawScript());
         logger.info("task dir : {}", taskDir);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 32c2ad1..31b7447 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -17,14 +17,10 @@
 
 package org.apache.dolphinscheduler.server.worker.task.shell;
 
-import static java.util.Calendar.DAY_OF_MONTH;
-
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
@@ -34,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -41,7 +39,6 @@ import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -164,21 +161,11 @@ public class ShellTask extends AbstractTask {
     private String parseScript(String script) {
         // combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
-        // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
-        if (taskExecutionContext.getScheduleTime() != null) {
-            if (paramsMap == null) {
-                paramsMap = new HashMap<>();
-            }
-            Date date = taskExecutionContext.getScheduleTime();
-            if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
-                date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
-            }
-            String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
-            Property p = new Property();
-            p.setValue(dateTime);
-            p.setProp(Constants.PARAMETER_DATETIME);
-            paramsMap.put(Constants.PARAMETER_DATETIME, p);
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
         }
         return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 6939439..64c60b0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 
+import org.apache.commons.collections.MapUtils;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -110,12 +113,14 @@ public class SparkTask extends AbstractYarnTask {
 
         // replace placeholder, and combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
-        String command = null;
-
-        if (null != paramsMap) {
-            command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
         }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
+
+        String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
 
         logger.info("spark task command: {}", command);
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 3c4b3ab..ee2265c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -169,9 +169,15 @@ public class SqlTask extends AbstractTask {
 
         // combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(taskExecutionContext.getParamsMap());
+        }
 
         // spell SQL according to the final user-defined variable
-        if (paramsMap == null) {
+        if (MapUtils.isEmpty(paramsMap)) {
             sqlBuilder.append(sql);
             return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 2f3e48d..ce91199 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
 
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -74,8 +77,14 @@ public class SqoopTask extends AbstractYarnTask {
 
         // combining local and global parameters
         Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
+        if (MapUtils.isEmpty(paramsMap)) {
+            paramsMap = new HashMap<>();
+        }
+        if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
+            paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+        }
 
-        if (paramsMap != null) {
+        if (MapUtils.isNotEmpty(paramsMap)) {
             String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
             logger.info("sqoop script: {}", resultScripts);
             return resultScripts;