You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2020/06/06 13:12:51 UTC
[incubator-dolphinscheduler] branch dev-1.3.0 updated: Fixbug datax
task (#2909)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev-1.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-1.3.0 by this push:
new 7c9f596 Fixbug datax task (#2909)
7c9f596 is described below
commit 7c9f5960e5a0c2b446a5575d1703c8f644c10dce
Author: Rubik-W <39...@users.noreply.github.com>
AuthorDate: Sat Jun 6 21:12:41 2020 +0800
Fixbug datax task (#2909)
* fix: local param bug
* fix: UT bug
Co-authored-by: Rubik-W <wh...@163.com>
---
.../common/task/datax/DataxParameters.java | 11 +++--
.../common/utils/ParameterUtils.java | 2 +-
.../server/worker/task/datax/DataxTask.java | 49 ++++++++--------------
.../server/worker/task/datax/DataxTaskTest.java | 11 ++---
.../pages/dag/_source/formModel/tasks/datax.vue | 11 +++++
5 files changed, 40 insertions(+), 44 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
index 872b3aa..f54e107 100755
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
@@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters {
/**
* if custom json configļ¼eg 0, 1
*/
- private Integer customConfig;
+ private int customConfig;
/**
* if customConfig eq 1 ,then json is usable
@@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;
- public Integer getCustomConfig() {
+ public int getCustomConfig() {
return customConfig;
}
- public void setCustomConfig(Integer customConfig) {
+ public void setCustomConfig(int customConfig) {
this.customConfig = customConfig;
}
@@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord;
}
-
@Override
public boolean checkParameters() {
- if (customConfig == null) return false;
- if (customConfig == 0) {
+ if (customConfig == Flag.NO.ordinal()) {
return dataSource != 0
&& dataTarget != 0
&& StringUtils.isNotEmpty(sql)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
index bbc8955..84c60db 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
@@ -48,7 +48,7 @@ public class ParameterUtils {
* @return convert parameters place holders
*/
public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) {
- if (StringUtils.isEmpty(parameterString)) {
+ if (StringUtils.isEmpty(parameterString) || parameterMap == null) {
return parameterString;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index 9589a30..f636133 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
@@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
+ // combining local and global parameters
+ Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
+ taskExecutionContext.getDefinedParams(),
+ dataXParameters.getLocalParametersMap(),
+ CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+ taskExecutionContext.getScheduleTime());
+
// run datax process
- String jsonFilePath = buildDataxJsonFile();
- String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
+ String jsonFilePath = buildDataxJsonFile(paramsMap);
+ String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
@@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask {
* @return datax json file name
* @throws Exception if error throws Exception
*/
- private String buildDataxJsonFile()
+ private String buildDataxJsonFile(Map<String, Property> paramsMap)
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
@@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask {
return fileName;
}
-
-
- if (dataXParameters.getCustomConfig() == 1){
-
+ if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
-
- /**
- * combining local and global parameters
- */
- Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- dataXParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
- if (paramsMap != null){
- json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
- }
-
}else {
-
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
@@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask {
json = root.toString();
}
+ // replace placeholder
+ json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
+
logger.debug("datax job json : {}", json);
// create datax json file
@@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask {
* @return shell command file name
* @throws Exception if error throws Exception
*/
- private String buildShellCommandFile(String jobConfigFilePath)
+ private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
@@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask {
sbr.append(DATAX_HOME_EVN);
sbr.append(" ");
sbr.append(jobConfigFilePath);
- String dataxCommand = sbr.toString();
- // combining local and global parameters
// replace placeholder
- Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- dataXParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
- if (paramsMap != null) {
- dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap));
- }
+ String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap));
logger.debug("raw script : {}", dataxCommand);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
index a2a46ef..041f81d 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
@@ -21,9 +21,9 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import com.alibaba.fastjson.JSONObject;
-import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
@@ -273,14 +273,15 @@ public class DataxTaskTest {
setTaskParems(0);
buildDataJson();
} catch (Exception e) {
+ e.printStackTrace();
Assert.fail(e.getMessage());
}
}
public void buildDataJson() throws Exception {
- Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
+ Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class});
method.setAccessible(true);
- String filePath = (String) method.invoke(dataxTask, null);
+ String filePath = (String) method.invoke(dataxTask, new Object[]{null});
Assert.assertNotNull(filePath);
}
@@ -358,9 +359,9 @@ public class DataxTaskTest {
public void testBuildShellCommandFile()
throws Exception {
try {
- Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class);
+ Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class);
method.setAccessible(true);
- Assert.assertNotNull(method.invoke(dataxTask, "test.json"));
+ Assert.assertNotNull(method.invoke(dataxTask, "test.json", null));
}
catch (Exception e) {
Assert.fail(e.getMessage());
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
index f1c9b75..9347b34 100755
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
@@ -237,6 +237,12 @@
this.postStatements = a
},
/**
+ * return localParams
+ */
+ _onLocalParams (a) {
+ this.localParams = a
+ },
+ /**
* verification
*/
_verification () {
@@ -246,6 +252,11 @@
return false
}
+ // localParams Subcomponent verification
+ if (!this.$refs.refLocalParams._verifProp()) {
+ return false
+ }
+
// storage
this.$emit('on-params', {
customConfig: this.customConfig,