You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/06/30 14:45:33 UTC
[dolphinscheduler] branch dev updated: [Optimization] Calculate global parameter and local parameter at master. (#10704)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8f621ff98b [Optimization] Calculate global parameter and local parameter at master. (#10704)
8f621ff98b is described below
commit 8f621ff98bba10b723014e8625d3cd9d10cb1170
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Thu Jun 30 22:45:25 2022 +0800
[Optimization] Calculate global parameter and local parameter at master. (#10704)
* Global parameter and local parameter calculation external expansion.
* k8s task ut fix.
* TimePlaceholderUtils import DateUtils fix
* follow the review comments to fix.
* follow the review comments to fix.
* e2e rerun
---
.../service/impl/ProcessInstanceServiceImpl.java | 4 +-
.../api/service/ProcessInstanceServiceTest.java | 6 +-
.../apache/dolphinscheduler/common/Constants.java | 2 +
.../expand/DolphinSchedulerCuringGlobalParams.java | 96 ---------
.../utils/placeholder/TimePlaceholderUtils.java | 2 +-
.../common/utils/ParameterUtilsTest.java | 17 --
.../builder/TaskExecutionContextBuilder.java | 27 +++
.../master/runner/MasterSchedulerService.java | 4 +-
.../master/runner/WorkflowExecuteRunnable.java | 11 +-
.../master/runner/task/BaseTaskProcessor.java | 11 +
.../server/master/WorkflowExecuteTaskTest.java | 7 +-
.../service/expand/CuringGlobalParams.java | 228 +++++++++++++++++++++
.../service/expand/CuringParamsService.java | 25 ++-
.../TimePlaceholderResolverExpandService.java | 2 +-
.../TimePlaceholderResolverExpandServiceImpl.java | 2 +-
.../service/process/ProcessServiceImpl.java | 5 +-
.../expand/CuringGlobalParamsServiceTest.java | 6 +-
.../TimePlaceholderResolverExpandServiceTest.java | 2 +-
.../service/process/ProcessServiceTest.java | 4 +-
.../plugin/task/api/TaskExecutionContext.java | 5 +
.../plugin/task/api/parser/ParamUtils.java | 85 --------
.../plugin/task/api/parser/ParameterUtils.java | 56 -----
.../plugin/task/dq/DataQualityTask.java | 15 +-
.../plugin/task/datax/DataxTask.java | 8 +-
.../plugin/task/flink/FlinkTask.java | 8 +-
.../plugin/task/http/HttpTask.java | 8 +-
.../plugin/task/jupyter/JupyterTask.java | 8 +-
.../dolphinscheduler/plugin/task/k8s/K8sTask.java | 8 +-
.../plugin/task/k8s/K8sTaskTest.java | 7 +
.../plugin/task/mlflow/MlflowTask.java | 9 +-
.../plugin/task/mr/MapReduceTask.java | 8 +-
.../plugin/task/procedure/ProcedureTask.java | 2 +-
.../plugin/task/python/PythonTask.java | 9 +-
.../plugin/task/seatunnel/SeatunnelTask.java | 8 +-
.../plugin/task/shell/ShellTask.java | 10 +-
.../plugin/task/spark/SparkTask.java | 8 +-
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 2 +-
.../plugin/task/sqoop/SqoopTask.java | 9 +-
.../server/worker/runner/TaskExecuteThread.java | 34 ---
39 files changed, 344 insertions(+), 424 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 3b9e3f89c7..9b4c34ad16 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -51,7 +51,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -155,7 +155,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
private ScheduleMapper scheduleMapper;
@Autowired
- private CuringGlobalParamsService curingGlobalParamsService;
+ private CuringParamsService curingGlobalParamsService;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index fe8149536d..039da1e397 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -36,8 +36,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
-import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -72,7 +71,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-import org.springframework.beans.factory.annotation.Autowired;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
@@ -131,7 +129,7 @@ public class ProcessInstanceServiceTest {
ScheduleMapper scheduleMapper;
@Mock
- CuringGlobalParamsService curingGlobalParamsService;
+ CuringParamsService curingGlobalParamsService;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 5bc1d7657f..a081570f4a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -831,4 +831,6 @@ public final class Constants {
public static final int USER_PASSWORD_MAX_LENGTH = 20;
public static final int USER_PASSWORD_MIN_LENGTH = 2;
+
+ public static final String FUNCTION_START_WITH = "$";
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java
deleted file mode 100644
index dde31af4fa..0000000000
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.common.expand;
-
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
-import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-@Component
-public class DolphinSchedulerCuringGlobalParams implements CuringGlobalParamsService {
-
- @Autowired
- private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
-
- @Override
- public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
- return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
- }
-
- @Override
- public boolean timeFunctionNeedExpand(String placeholderName) {
- return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
- }
-
- @Override
- public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
- return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
- }
-
- @Override
- public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
- if (globalParamList == null || globalParamList.isEmpty()) {
- return null;
- }
- Map<String, String> globalMap = new HashMap<>();
- if (globalParamMap != null) {
- globalMap.putAll(globalParamMap);
- }
- Map<String, String> allParamMap = new HashMap<>();
- //If it is a complement, a complement time needs to be passed in, according to the task type
- Map<String, String> timeParams = BusinessTimeUtils.
- getBusinessTime(commandType, scheduleTime, timezone);
-
- if (timeParams != null) {
- allParamMap.putAll(timeParams);
- }
- allParamMap.putAll(globalMap);
- Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
- Map<String, String> resolveMap = new HashMap<>();
- for (Map.Entry<String, String> entry : entries) {
- String val = entry.getValue();
- if (val.startsWith("$")) {
- String str = "";
- if (timeFunctionNeedExpand(val)) {
- str = timeFunctionExtension(processInstanceId, timezone, val);
- } else {
- str = convertParameterPlaceholders(val, allParamMap);
- }
- resolveMap.put(entry.getKey(), str);
- }
- }
- globalMap.putAll(resolveMap);
- for (Property property : globalParamList) {
- String val = globalMap.get(property.getProp());
- if (val != null) {
- property.setValue(val);
- }
- }
- return JSONUtils.toJsonString(globalParamList);
- }
-}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java
index ee176da299..28d96a5969 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java
@@ -512,7 +512,7 @@ public class TimePlaceholderUtils {
if (Character.isDigit(expression.charAt(index + 1))) {
String addMinuteExpr = expression.substring(index + 1);
- Date targetDate = org.apache.commons.lang.time.DateUtils
+ Date targetDate = org.apache.commons.lang3.time.DateUtils
.addMinutes(date, calcMinutes(addMinuteExpr));
String dateFormat = expression.substring(0, index);
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
index 6f9bb89c0f..ba27032ad8 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
@@ -20,27 +20,16 @@ package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlaceholderUtils.replacePlaceholders;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CommandType;
-import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService;
-import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
-import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
-import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import java.text.ParseException;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +38,6 @@ import org.slf4j.LoggerFactory;
public class ParameterUtilsTest {
public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class);
- @Mock
- private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
-
- @InjectMocks
- private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
-
/**
* Test convertParameterPlaceholders
*/
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index 6c7154b45d..962e42a98b 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -29,7 +29,12 @@ import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
+
+import java.util.Map;
/**
* TaskExecutionContext builder
@@ -131,6 +136,28 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
return this;
}
+
+ /**
+ * build global and local params
+ * @param propertyMap
+ * @return
+ */
+ public TaskExecutionContextBuilder buildParamInfo(Map<String, Property> propertyMap) {
+ taskExecutionContext.setPrepareParamsMap(propertyMap);
+ return this;
+ }
+
+ /**
+ * build business params
+ * @param businessParamsMap
+ * @return
+ */
+ public TaskExecutionContextBuilder buildBusinessParamsMap(Map<String, Property> businessParamsMap) {
+ taskExecutionContext.setParamsMap(businessParamsMap);
+ return this;
+ }
+
+
/**
* create
*
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 5c43256623..3254d6c5b5 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -95,7 +95,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
private StateWheelExecuteThread stateWheelExecuteThread;
@Autowired
- private CuringGlobalParamsService curingGlobalParamsService;
+ private CuringParamsService curingGlobalParamsService;
private String masterAddress;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 42654665a2..25766d0a4c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -49,7 +49,6 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -239,7 +238,7 @@ public class WorkflowExecuteRunnable implements Runnable {
/**
* curing global params service
*/
- private final CuringGlobalParamsService curingGlobalParamsService;
+ private final CuringParamsService curingParamsService;
/**
* @param processInstance processInstance
@@ -255,14 +254,14 @@ public class WorkflowExecuteRunnable implements Runnable {
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, StateWheelExecuteThread stateWheelExecuteThread
- , CuringGlobalParamsService curingGlobalParamsService) {
+ , CuringParamsService curingParamsService) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
- this.curingGlobalParamsService = curingGlobalParamsService;
+ this.curingParamsService = curingParamsService;
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -1008,7 +1007,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
- String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
+ String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
index 9af664e0e3..05348a2dd3 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
@@ -59,6 +59,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ConnectorType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.model.JdbcInfo;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -73,6 +74,7 @@ import org.apache.dolphinscheduler.plugin.task.k8s.K8sTaskParameters;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -123,6 +125,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected TaskPluginManager taskPluginManager;
+ protected CuringParamsService curingParamsService;
+
protected String threadLoggerInfoName;
@Override
@@ -130,6 +134,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
processService = SpringApplicationContext.getBean(ProcessService.class);
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
+ curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -301,6 +306,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}
+ Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance);
+
+ AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
+ Map<String, Property> propertyMap = curingParamsService.paramParsingPreparation(taskInstance, baseParam, processInstance);
return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
@@ -309,6 +318,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
.buildResourceParametersInfo(resources)
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
.buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
+ .buildBusinessParamsMap(businessParamsMap)
+ .buildParamInfo(propertyMap)
.create();
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
index 8c7104ca30..097ba120cb 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
@@ -26,7 +26,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -87,7 +86,7 @@ public class WorkflowExecuteTaskTest {
private StateWheelExecuteThread stateWheelExecuteThread;
- private CuringGlobalParamsService curingGlobalParamsService;
+ private CuringParamsService curingGlobalParamsService;
@Before
public void init() throws Exception {
@@ -116,7 +115,7 @@ public class WorkflowExecuteTaskTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
- curingGlobalParamsService = mock(CuringGlobalParamsService.class);
+ curingGlobalParamsService = mock(CuringParamsService.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java
new file mode 100644
index 0000000000..9055efd2a2
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParams.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import lombok.NonNull;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class CuringGlobalParams implements CuringParamsService {
+
+ @Autowired
+ private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+ @Override
+ public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+ return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+ }
+
+ @Override
+ public boolean timeFunctionNeedExpand(String placeholderName) {
+ return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+ }
+
+ @Override
+ public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+ return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+ }
+
+ /**
+ * here it is judged whether external expansion calculation is required and the calculation result is obtained
+ * @param processInstanceId
+ * @param globalParamMap
+ * @param globalParamList
+ * @param commandType
+ * @param scheduleTime
+ * @param timezone
+ * @return
+ */
+ @Override
+ public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+ if (globalParamList == null || globalParamList.isEmpty()) {
+ return null;
+ }
+ Map<String, String> globalMap = new HashMap<>();
+ if (globalParamMap != null) {
+ globalMap.putAll(globalParamMap);
+ }
+ Map<String, String> allParamMap = new HashMap<>();
+ //If it is a complement, a complement time needs to be passed in, according to the task type
+ Map<String, String> timeParams = BusinessTimeUtils.
+ getBusinessTime(commandType, scheduleTime, timezone);
+
+ if (timeParams != null) {
+ allParamMap.putAll(timeParams);
+ }
+ allParamMap.putAll(globalMap);
+ Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+ Map<String, String> resolveMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : entries) {
+ String val = entry.getValue();
+ if (val.startsWith(Constants.FUNCTION_START_WITH)) {
+ String str = "";
+ // whether external scaling calculation is required
+ if (timeFunctionNeedExpand(val)) {
+ str = timeFunctionExtension(processInstanceId, timezone, val);
+ } else {
+ str = convertParameterPlaceholders(val, allParamMap);
+ }
+ resolveMap.put(entry.getKey(), str);
+ }
+ }
+ globalMap.putAll(resolveMap);
+ for (Property property : globalParamList) {
+ String val = globalMap.get(property.getProp());
+ if (val != null) {
+ property.setValue(val);
+ }
+ }
+ return JSONUtils.toJsonString(globalParamList);
+ }
+
+ /**
+ * the global parameters and local parameters used in the worker will be prepared here.
+ *
+ * @param taskInstance
+ * @param parameters
+ * @param processInstance
+ * @return
+ */
+ @Override
+ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters parameters, @NonNull ProcessInstance processInstance) {
+ // assign value to definedParams here
+ Map<String,String> globalParamsMap = setGlobalParamsMap(processInstance);
+ Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(globalParamsMap);
+ CommandType commandType = processInstance.getCmdTypeIfComplement();
+ Date scheduleTime = processInstance.getScheduleTime();
+
+ // combining local and global parameters
+ Map<String, Property> localParams = parameters.getInputLocalParametersMap();
+
+ //stream pass params
+ Map<String, Property> varParams = parameters.getVarPoolMap();
+
+ if (globalParams.isEmpty() && localParams.isEmpty() && varParams.isEmpty()) {
+ return null;
+ }
+ // if it is a complement,
+ // you need to pass in the task instance id to locate the time
+ // of the process instance complement
+ Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+ String timeZone = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
+ Map<String,String> params = BusinessTimeUtils.getBusinessTime(commandType, scheduleTime, timeZone);
+
+ if (globalParamsMap != null) {
+ params.putAll(globalParamsMap);
+ }
+
+ if (StringUtils.isNotBlank(taskInstance.getExecutePath())) {
+ params.put(PARAMETER_TASK_EXECUTE_PATH, taskInstance.getExecutePath());
+ }
+ params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskInstance.getId()));
+
+ if (varParams.size() != 0) {
+ globalParams.putAll(varParams);
+ }
+ if (localParams.size() != 0) {
+ globalParams.putAll(localParams);
+ }
+
+ Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Property> en = iter.next();
+ Property property = en.getValue();
+
+ if (StringUtils.isNotEmpty(property.getValue()) && property.getValue().startsWith(Constants.FUNCTION_START_WITH)) {
+ /**
+ * local parameter refers to global parameter with the same name
+ * note: the global parameters of the process instance here are solidified parameters,
+ * and there are no variables in them.
+ */
+ String val = property.getValue();
+ // whether external scaling calculation is required
+ if (timeFunctionNeedExpand(val)) {
+ val = timeFunctionExtension(taskInstance.getProcessInstanceId(), timeZone, val);
+ } else {
+ val = convertParameterPlaceholders(val, params);
+ }
+ property.setValue(val);
+ }
+ }
+ if (MapUtils.isEmpty(globalParams)) {
+ globalParams = new HashMap<>();
+ }
+ // put schedule time param to params map
+ Map<String, Property> paramsMap = preBuildBusinessParams(processInstance);
+ if (MapUtils.isNotEmpty(paramsMap)) {
+ globalParams.putAll(paramsMap);
+ }
+ return globalParams;
+ }
+
+ private Map<String, String> setGlobalParamsMap(ProcessInstance processInstance) {
+ Map<String, String> globalParamsMap = new HashMap<>(16);
+
+ // global params string
+ String globalParamsStr = processInstance.getGlobalParams();
+ if (globalParamsStr != null) {
+ List<Property> globalParamsList = JSONUtils.toList(globalParamsStr, Property.class);
+ globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
+ }
+ return globalParamsMap;
+ }
+
+ @Override
+ public Map<String, Property> preBuildBusinessParams(ProcessInstance processInstance) {
+ Map<String, Property> paramsMap = new HashMap<>();
+ // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
+ if (processInstance.getScheduleTime() != null) {
+ Date date = processInstance.getScheduleTime();
+ String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME, null);
+ Property p = new Property();
+ p.setValue(dateTime);
+ p.setProp(Constants.PARAMETER_DATETIME);
+ paramsMap.put(Constants.PARAMETER_DATETIME, p);
+ }
+ return paramsMap;
+ }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java
similarity index 69%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java
index 63ea658c9f..3342bd6bba 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsService.java
@@ -15,16 +15,21 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.expand;
+package org.apache.dolphinscheduler.service.expand;
+import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import java.util.Date;
import java.util.List;
import java.util.Map;
-public interface CuringGlobalParamsService {
+public interface CuringParamsService {
/**
* time function need expand
@@ -61,4 +66,20 @@ public interface CuringGlobalParamsService {
* @return
*/
String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone);
+
+ /**
+ * param parsing preparation
+ * @param parameters
+ * @param taskInstance
+ * @param processInstance
+ * @return
+ */
+ Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters parameters, @NonNull ProcessInstance processInstance);
+
+ /**
+ * preBuildBusinessParams
+ * @param processInstance
+ * @return
+ */
+ Map<String, Property> preBuildBusinessParams(ProcessInstance processInstance);
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java
similarity index 95%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java
index bdd811522c..98a0bb2b00 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.expand;
+package org.apache.dolphinscheduler.service.expand;
public interface TimePlaceholderResolverExpandService {
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java
similarity index 95%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java
index b37fcf076b..2ac37b0887 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.expand;
+package org.apache.dolphinscheduler.service.expand;
import org.springframework.stereotype.Component;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 97d1266830..057d6e18a5 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -42,7 +42,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -51,7 +51,6 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@@ -273,7 +272,7 @@ public class ProcessServiceImpl implements ProcessService {
private K8sMapper k8sMapper;
@Autowired
- private CuringGlobalParamsService curingGlobalParamsService;
+ private CuringParamsService curingGlobalParamsService;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java
similarity index 97%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java
rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java
index f6ea074aa0..e89748446c 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringGlobalParamsServiceTest.java
@@ -16,7 +16,7 @@
*/
-package org.apache.dolphinscheduler.common.expand;
+package org.apache.dolphinscheduler.service.expand;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@@ -45,10 +45,10 @@ public class CuringGlobalParamsServiceTest {
private static final String placeHolderName = "$[yyyy-MM-dd-1]";
@Mock
- private CuringGlobalParamsService curingGlobalParamsService;
+ private CuringParamsService curingGlobalParamsService;
@InjectMocks
- private DolphinSchedulerCuringGlobalParams dolphinSchedulerCuringGlobalParams;
+ private CuringGlobalParams dolphinSchedulerCuringGlobalParams;
@Mock
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java
similarity index 97%
rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java
rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java
index 542869530e..08d04be5c5 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/TimePlaceholderResolverExpandServiceTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.common.expand;
+package org.apache.dolphinscheduler.service.expand;
import org.apache.commons.lang3.StringUtils;
import org.junit.Assert;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 0dd9734bd1..2e65a0cb5b 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
-import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -164,7 +164,7 @@ public class ProcessServiceTest {
private ScheduleMapper scheduleMapper;
@Mock
- CuringGlobalParamsService curingGlobalParamsService;
+ CuringParamsService curingGlobalParamsService;
@Test
public void testCreateSubCommand() {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
index 2ada887868..c1276d5806 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
@@ -182,6 +182,11 @@ public class TaskExecutionContext implements Serializable {
*/
private Map<String, String> definedParams;
+ /**
+ * prepare params map
+ */
+ private Map<String, Property> prepareParamsMap;
+
/**
* task AppId
*/
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java
index a59e1bf28d..15a7c14b76 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java
@@ -17,104 +17,19 @@
package org.apache.dolphinscheduler.plugin.task.api.parser;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
-import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
-import org.apache.dolphinscheduler.spi.enums.CommandType;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import com.google.common.base.Preconditions;
-
/**
* param utils
*/
public class ParamUtils {
- /**
- * parameter conversion
- * Warning:
- * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
- * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current
- * situation is wrong. So I cannot modify the original logic.
- *
- * @param taskExecutionContext the context of this task instance
- * @param parameters the parameters
- * @return global params
- *
- */
- public static Map<String, Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
- Preconditions.checkNotNull(taskExecutionContext);
- Preconditions.checkNotNull(parameters);
- Map<String, Property> globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams());
- Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
- CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
- Date scheduleTime = taskExecutionContext.getScheduleTime();
-
- // combining local and global parameters
- Map<String, Property> localParams = parameters.getInputLocalParametersMap();
-
- //stream pass params
- Map<String, Property> varParams = parameters.getVarPoolMap();
-
- if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) {
- return null;
- }
- // if it is a complement,
- // you need to pass in the task instance id to locate the time
- // of the process instance complement
- Map<String,String> params = BusinessTimeUtils
- .getBusinessTime(commandType,
- scheduleTime);
-
- if (globalParamsMap != null) {
-
- params.putAll(globalParamsMap);
- }
-
- if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
- params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath());
- }
- params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId()));
-
- if (varParams.size() != 0) {
- globalParams.putAll(varParams);
- }
- if (localParams.size() != 0) {
- globalParams.putAll(localParams);
- }
-
- Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, Property> en = iter.next();
- Property property = en.getValue();
-
- if (StringUtils.isNotEmpty(property.getValue())
- && property.getValue().startsWith("$")) {
- /**
- * local parameter refers to global parameter with the same name
- * note: the global parameters of the process instance here are solidified parameters,
- * and there are no variables in them.
- */
- String val = property.getValue();
-
- val = ParameterUtils.convertParameterPlaceholders(val, params);
- property.setValue(val);
- }
- }
-
- return globalParams;
- }
-
/**
* format convert
*
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java
index 383424f71a..8da11db459 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java
@@ -22,19 +22,15 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETE
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_SHECDULE_TIME;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
-import org.apache.dolphinscheduler.spi.enums.CommandType;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.spi.utils.DateUtils;
-import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.sql.PreparedStatement;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -152,58 +148,6 @@ public class ParameterUtils {
}
}
- /**
- * curing user define parameters
- *
- * @param globalParamMap global param map
- * @param globalParamList global param list
- * @param commandType command type
- * @param scheduleTime schedule time
- * @return curing user define parameters
- */
- public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
- CommandType commandType, Date scheduleTime) {
-
- if (globalParamList == null || globalParamList.isEmpty()) {
- return null;
- }
-
- Map<String, String> globalMap = new HashMap<>();
- if (globalParamMap != null) {
- globalMap.putAll(globalParamMap);
- }
- Map<String, String> allParamMap = new HashMap<>();
- //If it is a complement, a complement time needs to be passed in, according to the task type
- Map<String, String> timeParams = BusinessTimeUtils
- .getBusinessTime(commandType, scheduleTime);
-
- if (timeParams != null) {
- allParamMap.putAll(timeParams);
- }
-
- allParamMap.putAll(globalMap);
-
- Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
-
- Map<String, String> resolveMap = new HashMap<>();
- for (Map.Entry<String, String> entry : entries) {
- String val = entry.getValue();
- if (val.startsWith("$")) {
- String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap);
- resolveMap.put(entry.getKey(), str);
- }
- }
- globalMap.putAll(resolveMap);
-
- for (Property property : globalParamList) {
- String val = globalMap.get(property.getProp());
- if (val != null) {
- property.setValue(val);
- }
- }
- return JSONUtils.toJsonString(globalParamList);
- }
-
/**
* $[yyyyMMdd] replace schedule time
*/
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
index 039987da0e..578feec68d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
@@ -165,19 +165,8 @@ public class DataQualityTask extends AbstractYarnTask {
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
// replace placeholder
- Map<String, Property> paramsMap = ParamUtils.convert(dqTaskExecutionContext,getParameters());
-
- String command = null;
-
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
-
- if (MapUtils.isNotEmpty(dqTaskExecutionContext.getParamsMap())) {
- paramsMap.putAll(dqTaskExecutionContext.getParamsMap());
- }
-
- command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
+ Map<String, Property> paramsMap = dqTaskExecutionContext.getPrepareParamsMap();
+ String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
logger.info("data quality task command: {}", command);
return command;
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
index 10dc3dcf8c..49ec953725 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
@@ -150,13 +150,7 @@ public class DataxTask extends AbstractTaskExecutor {
public void handle() throws Exception {
try {
// replace placeholder,and combine local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
index aa8b26698b..f04282a144 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
@@ -160,13 +160,7 @@ public class FlinkTask extends AbstractYarnTask {
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParamUtils.convert(paramsMap)));
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
index c829ad0736..47bb7e9b21 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
@@ -124,13 +124,7 @@ public class HttpTask extends AbstractTaskExecutor {
RequestBuilder builder = createRequestBuilder();
// replace placeholder,and combine local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
List<HttpProperty> httpPropertyList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
index 96f7882797..cec72c9601 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
@@ -127,13 +127,7 @@ public class JupyterTask extends AbstractTaskExecutor {
args.addAll(populateJupyterOptions());
// replace placeholder, and combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
index a5ce9992f1..64b4f55a3d 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
@@ -65,13 +65,7 @@ public class K8sTask extends AbstractK8sTask {
@Override
protected String buildCommand() {
K8sTaskMainParameters k8sTaskMainParameters = new K8sTaskMainParameters();
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
Map<String,String> namespace = JSONUtils.toMap(k8sTaskParameters.getNamespace());
String namespaceName = namespace.get(NAMESPACE_NAME);
String clusterName = namespace.get(CLUSTER);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
index 60506999e4..84c1d895f4 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
@@ -65,6 +65,13 @@ public class K8sTaskTest {
Map<String, Property> paramsMap = new HashMap<>();
paramsMap.put(DAY,property);
taskRequest.setParamsMap(paramsMap);
+
+ Map<String, Property> prepareParamsMap = new HashMap<>();
+ Property property1 = new Property();
+ property1.setProp("day");
+ property1.setValue("20220507");
+ prepareParamsMap.put("day", property1);
+ taskRequest.setPrepareParamsMap(prepareParamsMap);
k8sTask = new K8sTask(taskRequest);
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
index 03fd4dce03..e1e13d17c5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
@@ -193,14 +193,7 @@ public class MlflowTask extends AbstractTaskExecutor {
private Map<String, Property> getParamsMap() {
// replace placeholder, and combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
- return paramsMap;
+ return taskExecutionContext.getPrepareParamsMap();
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
index 588dcda184..0a519dc65f 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
@@ -79,13 +79,7 @@ public class MapReduceTask extends AbstractYarnTask {
setMainJarName();
// replace placeholder,and combine local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 4f61ef1022..fc886983e5 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -103,7 +103,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
Map<Integer, Property> sqlParamsMap = new HashMap<>();
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
// call method
stmt = connection.prepareCall(proceduerSql);
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 1e5f8f8af3..f7f94736dc 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -217,14 +217,7 @@ public class PythonTask extends AbstractTaskExecutor {
protected Map<String, Property> mergeParamsWithContext(AbstractParameters parameters) {
// replace placeholder
- Map<String, Property> paramsMap = ParamUtils.convert(taskRequest, parameters);
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) {
- paramsMap.putAll(taskRequest.getParamsMap());
- }
- return paramsMap;
+ return taskRequest.getPrepareParamsMap();
}
/**
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
index b5270a7221..2ba758d5ad 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
@@ -158,13 +158,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
private String parseScript(String script) {
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
index 441deb495b..b5529bc8eb 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
@@ -30,7 +30,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
-import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
@@ -41,7 +40,6 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -167,13 +165,7 @@ public class ShellTask extends AbstractTaskExecutor {
private String parseScript(String script) {
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
index 152cbb22ce..b30ac64ae6 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
@@ -118,13 +118,7 @@ public class SparkTask extends AbstractYarnTask {
args.addAll(populateSparkOptions());
// replace placeholder, and combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParamUtils.convert(paramsMap));
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 419118803c..ef47c07133 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -405,7 +405,7 @@ public class SqlTask extends AbstractTaskExecutor {
StringBuilder sqlBuilder = new StringBuilder();
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
// spell SQL according to the final user-defined variable
if (paramsMap == null) {
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
index d0e58911a2..fe4c428576 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
@@ -77,14 +77,7 @@ public class SqoopTask extends AbstractYarnTask {
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
// combining local and global parameters
- Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
-
- if (MapUtils.isEmpty(paramsMap)) {
- paramsMap = new HashMap<>();
- }
- if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
- paramsMap.putAll(taskExecutionContext.getParamsMap());
- }
+ Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 7d49005395..2aa34a21fd 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -155,14 +155,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
- taskExecutionContext.setDefinedParams(getGlobalParamsMap());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
- preBuildBusinessParams();
-
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
@@ -252,23 +249,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
- /**
- * get global paras map
- *
- * @return map
- */
- private Map<String, String> getGlobalParamsMap() {
- Map<String, String> globalParamsMap = new HashMap<>(16);
-
- // global params string
- String globalParamsStr = taskExecutionContext.getGlobalParams();
- if (globalParamsStr != null) {
- List<Property> globalParamsList = JSONUtils.toList(globalParamsStr, Property.class);
- globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
- }
- return globalParamsMap;
- }
-
/**
* kill task
*/
@@ -359,18 +339,4 @@ public class TaskExecuteThread implements Runnable, Delayed {
public AbstractTask getTask() {
return task;
}
-
- private void preBuildBusinessParams() {
- Map<String, Property> paramsMap = new HashMap<>();
- // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
- if (taskExecutionContext.getScheduleTime() != null) {
- Date date = taskExecutionContext.getScheduleTime();
- String dateTime = DateUtils.format(date, Constants.PARAMETER_FORMAT_TIME, null);
- Property p = new Property();
- p.setValue(dateTime);
- p.setProp(Constants.PARAMETER_DATETIME);
- paramsMap.put(Constants.PARAMETER_DATETIME, p);
- }
- taskExecutionContext.setParamsMap(paramsMap);
- }
}