You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/08/24 07:06:32 UTC
[dolphinscheduler] branch dev updated: [FIX-#6007]Wrong complement
date (#6026)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b85793 [FIX-#6007]Wrong complement date (#6026)
7b85793 is described below
commit 7b8579310f5b11c3e8195b85873eaa920d11bc58
Author: linquan <11...@qq.com>
AuthorDate: Tue Aug 24 15:06:28 2021 +0800
[FIX-#6007]Wrong complement date (#6026)
* [FIX-#6007]Wrong complement date
* [style]Wrong complement date
---
.../server/entity/TaskExecutionContext.java | 14 +++++++++++
.../server/worker/runner/TaskExecuteThread.java | 23 +++++++++++++++++-
.../server/worker/task/datax/DataxTask.java | 8 +++++++
.../server/worker/task/flink/FlinkTask.java | 15 ++++++++----
.../server/worker/task/http/HttpTask.java | 8 +++++++
.../server/worker/task/mr/MapReduceTask.java | 21 +++++++++++------
.../server/worker/task/python/PythonTask.java | 15 ++++++++----
.../server/worker/task/shell/ShellTask.java | 27 ++++++----------------
.../server/worker/task/spark/SparkTask.java | 15 ++++++++----
.../server/worker/task/sql/SqlTask.java | 8 ++++++-
.../server/worker/task/sqoop/SqoopTask.java | 11 ++++++++-
11 files changed, 122 insertions(+), 43 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 7a47107..f50b638 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
@@ -222,6 +223,19 @@ public class TaskExecutionContext implements Serializable {
private String varPool;
/**
+ * business param
+ */
+ private Map<String, Property> paramsMap;
+
+ public Map<String, Property> getParamsMap() {
+ return paramsMap;
+ }
+
+ public void setParamsMap(Map<String, Property> paramsMap) {
+ this.paramsMap = paramsMap;
+ }
+
+ /**
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 50847f7..73a6638 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,6 +17,10 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import static java.util.Calendar.DAY_OF_MONTH;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -153,6 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
// task init
task.init();
+ preBuildBusinessParams();
//init varPool
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
@@ -182,6 +187,23 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
+ private void preBuildBusinessParams() {
+ Map<String, Property> paramsMap = new HashMap<>();
+ // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+ if (taskExecutionContext.getScheduleTime() != null) {
+ Date date = taskExecutionContext.getScheduleTime();
+ if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
+ date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
+ }
+ String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
+ Property p = new Property();
+ p.setValue(dateTime);
+ p.setProp(Constants.PARAMETER_DATETIME);
+ paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ }
+ taskExecutionContext.setParamsMap(paramsMap);
+ }
+
/**
* when task finish, clear execute path.
*/
@@ -227,7 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
return globalParamsMap;
}
-
/**
* kill task
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index c30326d..aa4e2ce 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
@@ -56,6 +57,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -155,6 +157,12 @@ public class DataxTask extends AbstractTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 863b91a..928edc5 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
+import org.apache.commons.collections.MapUtils;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -81,12 +84,16 @@ public class FlinkTask extends AbstractYarnTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
logger.info("param Map : {}", paramsMap);
- if (paramsMap != null) {
- args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
- logger.info("param args : {}", args);
- }
+ args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
+ logger.info("param args : {}", args);
flinkParameters.setMainArgs(args);
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 4e34741..2c9ccc4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
@@ -49,6 +50,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -138,6 +140,12 @@ public class HttpTask extends AbstractTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index 5e8f3ca..c7fdba4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
+import org.apache.commons.collections.MapUtils;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -85,14 +88,18 @@ public class MapReduceTask extends AbstractYarnTask {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
- if (paramsMap != null) {
- String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
- mapreduceParameters.setMainArgs(args);
- if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
- String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
- mapreduceParameters.setOthers(others);
- }
+ String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
+ mapreduceParameters.setMainArgs(args);
+ if (mapreduceParameters.getProgramType() != null && mapreduceParameters.getProgramType() == ProgramType.PYTHON) {
+ String others = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getOthers(), ParamUtils.convert(paramsMap));
+ mapreduceParameters.setOthers(others);
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 0ee480d..5ffa6ca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -30,6 +30,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -118,6 +121,12 @@ public class PythonTask extends AbstractTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
try {
rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
@@ -125,10 +134,8 @@ public class PythonTask extends AbstractTask {
catch (StringIndexOutOfBoundsException e) {
logger.error("setShareVar field format error, raw python script : {}", rawPythonScript);
}
-
- if (paramsMap != null) {
- rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
- }
+
+ rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 32c2ad1..31b7447 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -17,14 +17,10 @@
package org.apache.dolphinscheduler.server.worker.task.shell;
-import static java.util.Calendar.DAY_OF_MONTH;
-
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
@@ -34,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+import org.apache.commons.collections.MapUtils;
+
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -41,7 +39,6 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -164,21 +161,11 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
- // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
- if (taskExecutionContext.getScheduleTime() != null) {
- if (paramsMap == null) {
- paramsMap = new HashMap<>();
- }
- Date date = taskExecutionContext.getScheduleTime();
- if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
- date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
- }
- String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
- Property p = new Property();
- p.setValue(dateTime);
- p.setProp(Constants.PARAMETER_DATETIME);
- paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
}
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index 6939439..64c60b0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -30,7 +30,10 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
+import org.apache.commons.collections.MapUtils;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,12 +113,14 @@ public class SparkTask extends AbstractYarnTask {
// replace placeholder, and combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
-
- String command = null;
-
- if (null != paramsMap) {
- command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
}
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
+
+ String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
logger.info("spark task command: {}", command);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 3c4b3ab..ee2265c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -169,9 +169,15 @@ public class SqlTask extends AbstractTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
// spell SQL according to the final user-defined variable
- if (paramsMap == null) {
+ if (MapUtils.isEmpty(paramsMap)) {
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 2f3e48d..ce91199 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -74,8 +77,14 @@ public class SqoopTask extends AbstractYarnTask {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters());
+ if (MapUtils.isEmpty(paramsMap)) {
+ paramsMap = new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())) {
+ paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+ }
- if (paramsMap != null) {
+ if (MapUtils.isNotEmpty(paramsMap)) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
return resultScripts;