You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/08/20 13:40:18 UTC
[dolphinscheduler] branch 1.3.8-prepare updated: [FIX-#6007]Wrong
complement date (#6009)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 1.3.8-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.8-prepare by this push:
new d6fc749 [FIX-#6007]Wrong complement date (#6009)
d6fc749 is described below
commit d6fc749ca39d43abc340b82562060ed8885dd761
Author: Kirs <ac...@163.com>
AuthorDate: Fri Aug 20 21:39:58 2021 +0800
[FIX-#6007]Wrong complement date (#6009)
---
.../server/entity/TaskExecutionContext.java | 14 ++++++++++++
.../server/worker/runner/TaskExecuteThread.java | 26 +++++++++++++++++++++-
.../server/worker/task/datax/DataxTask.java | 9 +++++++-
.../server/worker/task/flink/FlinkTask.java | 9 ++++++++
.../server/worker/task/http/HttpTask.java | 9 ++++++++
.../server/worker/task/mr/MapReduceTask.java | 9 ++++++++
.../server/worker/task/python/PythonTask.java | 10 +++++++++
.../server/worker/task/shell/ShellTask.java | 22 +++++++-----------
.../server/worker/task/sql/SqlTask.java | 10 +++++++--
.../server/worker/task/sqoop/SqoopTask.java | 10 ++++++++-
10 files changed, 109 insertions(+), 19 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 41331ab..e6df0f6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.entity;
+import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
@@ -192,6 +193,19 @@ public class TaskExecutionContext implements Serializable{
private SqoopTaskExecutionContext sqoopTaskExecutionContext;
/**
+ * business param
+ */
+ private Map<String, Property> paramsMap;
+
+ public Map<String, Property> getParamsMap() {
+ return paramsMap;
+ }
+
+ public void setParamsMap(Map<String, Property> paramsMap) {
+ this.paramsMap = paramsMap;
+ }
+
+ /**
* procedure TaskExecutionContext
*/
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 81f14e7..77a775d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,14 +17,20 @@
package org.apache.dolphinscheduler.server.worker.runner;
+import static java.util.Calendar.DAY_OF_MONTH;
+
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.MapUtils;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -128,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
// task init
task.init();
-
+ preBuildBusinessParams();
// task handle
task.handle();
@@ -154,6 +160,24 @@ public class TaskExecuteThread implements Runnable {
}
}
+
+ private void preBuildBusinessParams(){
+ Map<String, Property> paramsMap = new HashMap<>();
+ // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+ if (taskExecutionContext.getScheduleTime() != null) {
+ Date date = taskExecutionContext.getScheduleTime();
+ if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
+ date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
+ }
+ String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
+ Property p = new Property();
+ p.setValue(dateTime);
+ p.setProp(Constants.PARAMETER_DATETIME);
+ paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ }
+ taskExecutionContext.setParamsMap(paramsMap);
+ }
+
/**
* when task finish, clear execute path.
*/
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index afb9115..0522187 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
@@ -57,6 +58,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -157,7 +159,12 @@ public class DataxTask extends AbstractTask {
dataXParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
-
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
// run datax process
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 4d34190..450964d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -31,7 +31,10 @@ import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
+import org.apache.commons.collections.MapUtils;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -86,6 +89,12 @@ public class FlinkTask extends AbstractYarnTask {
flinkParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
logger.info("param Map : {}", paramsMap);
if (paramsMap != null) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 8dc5659..acc3eab 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.worker.task.http;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.Charsets;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -148,6 +151,12 @@ public class HttpTask extends AbstractTask {
httpParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
List<HttpProperty> httpPropertyList = new ArrayList<>();
if(CollectionUtils.isNotEmpty(httpParameters.getHttpParams() )){
for (HttpProperty httpProperty: httpParameters.getHttpParams()) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index f60b1cb..4988794 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -32,7 +32,10 @@ import org.apache.dolphinscheduler.server.utils.MapReduceArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
+import org.apache.commons.collections.MapUtils;
+
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,6 +93,12 @@ public class MapReduceTask extends AbstractYarnTask {
mapreduceParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
if (paramsMap != null) {
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 7a66227..f8df005 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -29,8 +29,12 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
+
+import org.apache.commons.collections.MapUtils;
+
import org.slf4j.Logger;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -120,6 +124,12 @@ public class PythonTask extends AbstractTask {
pythonParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
if (paramsMap != null){
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index a331765..cca2a04 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -32,6 +32,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+
+import org.apache.commons.collections.MapUtils;
+
import org.slf4j.Logger;
import java.io.File;
@@ -138,20 +141,11 @@ public class ShellTask extends AbstractTask {
shellParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
- // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
- if (taskExecutionContext.getScheduleTime() != null) {
- if (paramsMap == null) {
- paramsMap = new HashMap<>();
- }
- Date date = taskExecutionContext.getScheduleTime();
- if (CommandType.COMPLEMENT_DATA.getCode() == taskExecutionContext.getCmdTypeIfComplement()) {
- date = DateUtils.add(taskExecutionContext.getScheduleTime(), DAY_OF_MONTH, 1);
- }
- String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME);
- Property p = new Property();
- p.setValue(dateTime);
- p.setProp(Constants.PARAMETER_DATETIME);
- paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
}
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index af1a5c5..f021cb7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
@@ -181,9 +182,14 @@ public class SqlTask extends AbstractTask {
sqlParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
-
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())){
+ paramsMap.putAll(taskExecutionContext.getParamsMap());
+ }
// spell SQL according to the final user-defined variable
- if(paramsMap == null){
+ if(paramsMap.isEmpty()){
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 00d94f0..da5898d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -28,6 +28,9 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
+import org.apache.commons.collections.MapUtils;
+
+import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
@@ -78,7 +81,12 @@ public class SqoopTask extends AbstractYarnTask {
sqoopParameters.getLocalParametersMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
-
+ if(MapUtils.isEmpty(paramsMap)){
+ paramsMap=new HashMap<>();
+ }
+ if (MapUtils.isNotEmpty(sqoopTaskExecutionContext.getParamsMap())){
+ paramsMap.putAll(sqoopTaskExecutionContext.getParamsMap());
+ }
if (paramsMap != null) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);