You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2022/06/28 03:25:00 UTC
[dolphinscheduler] branch dev updated: [Feature] Time function analysis extension. (#10624)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b5184138fa [Feature] Time function analysis extension. (#10624)
b5184138fa is described below
commit b5184138fab7c43b38e23e899b5b23d606fe7e9d
Author: WangJPLeo <10...@users.noreply.github.com>
AuthorDate: Tue Jun 28 11:24:51 2022 +0800
[Feature] Time function analysis extension. (#10624)
* Time function analysis extension.
* param add.
* clear useless logs and update method notes
* permission omission fix.
* extending time functions to optimize static methods
* e2e rerun.
---
.../service/impl/ProcessInstanceServiceImpl.java | 6 +-
.../api/service/impl/ProjectServiceImpl.java | 2 +-
.../api/service/impl/TaskGroupServiceImpl.java | 2 +
.../api/service/impl/WorkerGroupServiceImpl.java | 4 +-
.../api/service/ProcessInstanceServiceTest.java | 7 +
.../api/service/ProjectServiceTest.java | 11 +-
.../common/expand/CuringGlobalParamsService.java | 64 +++++++++
.../expand/DolphinSchedulerCuringGlobalParams.java | 96 +++++++++++++
.../TimePlaceholderResolverExpandService.java | 35 +++++
.../TimePlaceholderResolverExpandServiceImpl.java | 34 +++++
.../common/utils/ParameterUtils.java | 52 -------
.../expand/CuringGlobalParamsServiceTest.java | 151 +++++++++++++++++++++
.../TimePlaceholderResolverExpandServiceTest.java | 51 +++++++
.../common/utils/ParameterUtilsTest.java | 84 ++----------
.../master/runner/MasterSchedulerService.java | 7 +-
.../master/runner/WorkflowExecuteRunnable.java | 15 +-
.../server/master/WorkflowExecuteTaskTest.java | 6 +-
.../service/process/ProcessServiceImpl.java | 35 +++--
.../service/process/ProcessServiceTest.java | 9 ++
19 files changed, 516 insertions(+), 155 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index ca137aaf0d..3b9e3f89c7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -153,6 +154,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private ScheduleMapper scheduleMapper;
+ @Autowired
+ private CuringGlobalParamsService curingGlobalParamsService;
+
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
@@ -561,7 +565,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setScheduleTime(schedule);
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
- globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
+ globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(), globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule, timezone);
processInstance.setTimeout(timeout);
processInstance.setTenantCode(tenantCode);
processInstance.setGlobalParams(globalParams);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 17575f5dab..b10ee78392 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -475,7 +475,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
putMsg(result, Status.SUCCESS);
return result;
}
- List<Project> projects = projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(), new ArrayList<>(projectIds));
+ List<Project> projects = projectMapper.selectBatchIds(projectIds);
result.put(Constants.DATA_LIST, projects);
putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index 32ea9ad39f..e02fbca658 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -41,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -104,6 +105,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
taskGroup.setCreateTime(new Date());
taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) {
+ permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), Collections.singletonList(taskGroup.getId()),logger);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.CREATE_TASK_GROUP_ERROR);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index d371d32c18..c083a8a52f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -122,11 +122,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroupMapper.updateById(workerGroup);
} else {
workerGroupMapper.insert(workerGroup);
- }
- putMsg(result, Status.SUCCESS);
- if (id != 0) {
permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger);
}
+ putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index fe64eab94d..fe8149536d 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -36,6 +36,8 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
+import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -70,6 +72,8 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.springframework.beans.factory.annotation.Autowired;
+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
@@ -126,6 +130,9 @@ public class ProcessInstanceServiceTest {
@Mock
ScheduleMapper scheduleMapper;
+ @Mock
+ CuringGlobalParamsService curingGlobalParamsService;
+
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
index b1e81e10ba..8eb675ea4f 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProjectServiceTest.java
@@ -108,7 +108,6 @@ public class ProjectServiceTest {
public void testCheckProjectAndAuth() {
long projectCode = 1L;
-// Mockito.when(projectUserMapper.queryProjectRelation(1, 1)).thenReturn(getProjectUser());
User loginUser = getLoginUser();
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, null, projectCode, PROJECT);
@@ -142,7 +141,6 @@ public class ProjectServiceTest {
project1.setUserId(2);
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.PROJECTS, loginUser.getId(), PROJECT, baseServiceLogger)).thenReturn(true);
-// Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.PROJECTS, new Object[]{project.getId()}, 0, baseServiceLogger)).thenReturn(true);
result2 = projectService.checkProjectAndAuth(loginUser, project1, projectCode,PROJECT);
Assert.assertEquals(Status.USER_NO_OPERATION_PROJECT_PERM, result2.get(Constants.STATUS));
@@ -329,9 +327,8 @@ public class ProjectServiceTest {
List<Integer> list = new ArrayList<>(1);
list.add(1);
// not admin user
- // Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(1)).thenReturn(getList());
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set);
- Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList());
+ Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList());
result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser);
List<Project> notAdminUserResult = (List<Project>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(notAdminUserResult));
@@ -339,8 +336,7 @@ public class ProjectServiceTest {
//admin user
loginUser.setUserType(UserType.ADMIN_USER);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), projectLogger)).thenReturn(set);
- Mockito.when(projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),list)).thenReturn(getList());
-// Mockito.when(projectMapper.selectList(null)).thenReturn(getList());
+ Mockito.when(projectMapper.selectBatchIds(set)).thenReturn(getList());
result = projectService.queryProjectCreatedAndAuthorizedByUser(loginUser);
List<Project> projects = (List<Project>) result.get(Constants.DATA_LIST);
@@ -363,9 +359,6 @@ public class ProjectServiceTest {
@Test
public void testQueryUnauthorizedProject() {
-// Mockito.when(projectMapper.queryProjectExceptUserId(2)).thenReturn(getList());
- // Mockito.when(projectMapper.queryProjectCreatedByUser(2)).thenReturn(getList());
-// Mockito.when(projectMapper.queryAuthedProjectListByUserId(2)).thenReturn(getSingleList());
Set<Integer> set = new HashSet();
set.add(1);
// test admin user
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java
new file mode 100644
index 0000000000..63ea658c9f
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsService.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.expand;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+public interface CuringGlobalParamsService {
+
+ /**
+ * time function need expand
+ * @param placeholderName
+ * @return
+ */
+ boolean timeFunctionNeedExpand(String placeholderName);
+
+ /**
+ * time function extension
+ * @param processInstanceId
+ * @param timezone
+ * @param placeholderName
+ * @return
+ */
+ String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName);
+
+ /**
+ * convert parameter placeholders
+ * @param val
+ * @param allParamMap
+ * @return
+ */
+ String convertParameterPlaceholders(String val, Map<String, String> allParamMap);
+
+ /**
+ * curing global params
+ * @param processInstanceId
+ * @param globalParamMap
+ * @param globalParamList
+ * @param commandType
+ * @param scheduleTime
+ * @param timezone
+ * @return
+ */
+ String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone);
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java
new file mode 100644
index 0000000000..dde31af4fa
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.expand;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringGlobalParamsService {
+
+ @Autowired
+ private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+ @Override
+ public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+ return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+ }
+
+ @Override
+ public boolean timeFunctionNeedExpand(String placeholderName) {
+ return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+ }
+
+ @Override
+ public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+ return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+ }
+
+ @Override
+ public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+ if (globalParamList == null || globalParamList.isEmpty()) {
+ return null;
+ }
+ Map<String, String> globalMap = new HashMap<>();
+ if (globalParamMap != null) {
+ globalMap.putAll(globalParamMap);
+ }
+ Map<String, String> allParamMap = new HashMap<>();
+ //If it is a complement, a complement time needs to be passed in, according to the task type
+ Map<String, String> timeParams = BusinessTimeUtils.
+ getBusinessTime(commandType, scheduleTime, timezone);
+
+ if (timeParams != null) {
+ allParamMap.putAll(timeParams);
+ }
+ allParamMap.putAll(globalMap);
+ Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+ Map<String, String> resolveMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : entries) {
+ String val = entry.getValue();
+ if (val.startsWith("$")) {
+ String str = "";
+ if (timeFunctionNeedExpand(val)) {
+ str = timeFunctionExtension(processInstanceId, timezone, val);
+ } else {
+ str = convertParameterPlaceholders(val, allParamMap);
+ }
+ resolveMap.put(entry.getKey(), str);
+ }
+ }
+ globalMap.putAll(resolveMap);
+ for (Property property : globalParamList) {
+ String val = globalMap.get(property.getProp());
+ if (val != null) {
+ property.setValue(val);
+ }
+ }
+ return JSONUtils.toJsonString(globalParamList);
+ }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java
new file mode 100644
index 0000000000..bdd811522c
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.expand;
+
+public interface TimePlaceholderResolverExpandService {
+
+ /**
+ * check is need expand function
+ * @param placeholderName
+ * @return
+ */
+ boolean timeFunctionNeedExpand(String placeholderName);
+
+ /**
+ * time function extension
+ * @param placeholderName
+ * @return
+ */
+ String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName);
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java
new file mode 100644
index 0000000000..b37fcf076b
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.expand;
+
+import org.springframework.stereotype.Component;
+
+@Component
+public class TimePlaceholderResolverExpandServiceImpl implements TimePlaceholderResolverExpandService {
+
+ @Override
+ public boolean timeFunctionNeedExpand(String placeholderName) {
+ return false;
+ }
+
+ @Override
+ public String timeFunctionExtension(Integer processInstanceId, String timeZone, String placeholderName) {
+ return null;
+ }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
index feaae67297..c4edc5d1b7 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ParameterUtils.java
@@ -77,58 +77,6 @@ public class ParameterUtils {
return parameterString;
}
- /**
- * curing user define parameters
- *
- * @param globalParamMap global param map
- * @param globalParamList global param list
- * @param commandType command type
- * @param scheduleTime schedule time
- * @return curing user define parameters
- */
- public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
- CommandType commandType, Date scheduleTime, String timezone) {
-
- if (globalParamList == null || globalParamList.isEmpty()) {
- return null;
- }
-
- Map<String, String> globalMap = new HashMap<>();
- if (globalParamMap != null) {
- globalMap.putAll(globalParamMap);
- }
- Map<String, String> allParamMap = new HashMap<>();
- //If it is a complement, a complement time needs to be passed in, according to the task type
- Map<String, String> timeParams = BusinessTimeUtils.
- getBusinessTime(commandType, scheduleTime, timezone);
-
- if (timeParams != null) {
- allParamMap.putAll(timeParams);
- }
-
- allParamMap.putAll(globalMap);
-
- Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
-
- Map<String, String> resolveMap = new HashMap<>();
- for (Map.Entry<String, String> entry : entries) {
- String val = entry.getValue();
- if (val.startsWith("$")) {
- String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap);
- resolveMap.put(entry.getKey(), str);
- }
- }
- globalMap.putAll(resolveMap);
-
- for (Property property : globalParamList) {
- String val = globalMap.get(property.getProp());
- if (val != null) {
- property.setValue(val);
- }
- }
- return JSONUtils.toJsonString(globalParamList);
- }
-
/**
* handle escapes
*
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java
new file mode 100644
index 0000000000..f6ea074aa0
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/CuringGlobalParamsServiceTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.dolphinscheduler.common.expand;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(MockitoJUnitRunner.class)
+public class CuringGlobalParamsServiceTest {
+
+ private static final String placeHolderName = "$[yyyy-MM-dd-1]";
+
+ @Mock
+ private CuringGlobalParamsService curingGlobalParamsService;
+
+ @InjectMocks
+ private DolphinSchedulerCuringGlobalParams dolphinSchedulerCuringGlobalParams;
+
+ @Mock
+ private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+ @InjectMocks
+ private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
+
+ private final Map<String, String> globalParamMap = new HashMap<>();
+
+ @Before
+ public void init() {
+ globalParamMap.put("globalParams1", "Params1");
+ }
+
+ @Test
+ public void testConvertParameterPlaceholders() {
+ Mockito.when(curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap)).thenReturn("2022-06-26");
+ String result = curingGlobalParamsService.convertParameterPlaceholders(placeHolderName, globalParamMap);
+ Assert.assertNotNull(result);
+ }
+
+ @Test
+ public void testTimeFunctionNeedExpand() {
+ boolean result = curingGlobalParamsService.timeFunctionNeedExpand(placeHolderName);
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void testTimeFunctionExtension() {
+ String result = curingGlobalParamsService.timeFunctionExtension(1, "", placeHolderName);
+ Assert.assertNull(result);
+ }
+
+ @Test
+ public void testCuringGlobalParams() {
+ //define globalMap
+ Map<String, String> globalParamMap = new HashMap<>();
+ globalParamMap.put("globalParams1", "Params1");
+
+ //define globalParamList
+ List<Property> globalParamList = new ArrayList<>();
+
+ //define scheduleTime
+ Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
+
+ //test globalParamList is null
+ String result = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assert.assertNull(result);
+ Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
+ Assert.assertNull(dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
+
+ //test globalParamList is not null
+ Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
+ globalParamList.add(property);
+
+ String result2 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
+
+ String result3 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
+ Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
+
+ String result4 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
+
+ //test var $ startsWith
+ globalParamMap.put("bizDate", "${system.biz.date}");
+ globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
+
+ Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList");
+ Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}");
+ Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}");
+
+ globalParamList.add(property2);
+ globalParamList.add(property3);
+ globalParamList.add(property4);
+
+ String result5 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
+
+ Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
+ globalParamList.add(testStartParamProperty);
+ Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
+ globalParamList.add(testStartParam2Property);
+ globalParamMap.put("testStartParam", "");
+ globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
+
+ Map<String, String> startParamMap = new HashMap<>(2);
+ startParamMap.put("testStartParam", "$[yyyyMMdd]");
+
+ for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
+ String val = startParamMap.get(param.getKey());
+ if (val != null) {
+ param.setValue(val);
+ }
+ }
+
+ String result6 = dolphinSchedulerCuringGlobalParams.curingGlobalParams(1, globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
+ Assert.assertTrue(result6.contains("20191220"));
+ }
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java
new file mode 100644
index 0000000000..542869530e
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/expand/TimePlaceholderResolverExpandServiceTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.expand;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TimePlaceholderResolverExpandServiceTest {
+
+ @Mock
+ private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+ @InjectMocks
+ private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
+
+ private static final String placeHolderName = "$[yyyy-MM-dd-1]";
+
+ @Test
+ public void testTimePlaceholderResolverExpandService() {
+ boolean checkResult = timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeHolderName);
+ Assert.assertFalse(checkResult);
+ String resultString = timePlaceholderResolverExpandService.timeFunctionExtension(1, "", placeHolderName);
+ Assert.assertTrue(StringUtils.isEmpty(resultString));
+
+ boolean implCheckResult = timePlaceholderResolverExpandServiceImpl.timeFunctionNeedExpand(placeHolderName);
+ Assert.assertFalse(implCheckResult);
+ String implResultString = timePlaceholderResolverExpandServiceImpl.timeFunctionExtension(1, "", placeHolderName);
+ Assert.assertTrue(StringUtils.isEmpty(implResultString));
+ }
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
index 01a1eab9a7..6f9bb89c0f 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java
@@ -21,6 +21,8 @@ import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlacehold
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService;
+import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl;
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -34,13 +36,25 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(MockitoJUnitRunner.class)
public class ParameterUtilsTest {
public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class);
+ @Mock
+ private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+ @InjectMocks
+ private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
+
/**
* Test convertParameterPlaceholders
*/
@@ -84,76 +98,6 @@ public class ParameterUtilsTest {
parameterString);
}
- /**
- * Test curingGlobalParams
- */
- @Test
- public void testCuringGlobalParams() {
- //define globalMap
- Map<String, String> globalParamMap = new HashMap<>();
- globalParamMap.put("globalParams1", "Params1");
-
- //define globalParamList
- List<Property> globalParamList = new ArrayList<>();
-
- //define scheduleTime
- Date scheduleTime = DateUtils.stringToDate("2019-12-20 00:00:00");
-
- //test globalParamList is null
- String result = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assert.assertNull(result);
- Assert.assertNull(ParameterUtils.curingGlobalParams(null, null, CommandType.START_CURRENT_TASK_PROCESS, null, null));
- Assert.assertNull(ParameterUtils.curingGlobalParams(globalParamMap, null, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null));
-
- //test globalParamList is not null
- Property property = new Property("testGlobalParam", Direct.IN, DataType.VARCHAR, "testGlobalParam");
- globalParamList.add(property);
-
- String result2 = ParameterUtils.curingGlobalParams(null, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assert.assertEquals(result2, JSONUtils.toJsonString(globalParamList));
-
- String result3 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, null, null);
- Assert.assertEquals(result3, JSONUtils.toJsonString(globalParamList));
-
- String result4 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assert.assertEquals(result4, JSONUtils.toJsonString(globalParamList));
-
- //test var $ startsWith
- globalParamMap.put("bizDate", "${system.biz.date}");
- globalParamMap.put("b1zCurdate", "${system.biz.curdate}");
-
- Property property2 = new Property("testParamList1", Direct.IN, DataType.VARCHAR, "testParamList");
- Property property3 = new Property("testParamList2", Direct.IN, DataType.VARCHAR, "{testParamList1}");
- Property property4 = new Property("testParamList3", Direct.IN, DataType.VARCHAR, "${b1zCurdate}");
-
- globalParamList.add(property2);
- globalParamList.add(property3);
- globalParamList.add(property4);
-
- String result5 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assert.assertEquals(result5, JSONUtils.toJsonString(globalParamList));
-
- Property testStartParamProperty = new Property("testStartParam", Direct.IN, DataType.VARCHAR, "");
- globalParamList.add(testStartParamProperty);
- Property testStartParam2Property = new Property("testStartParam2", Direct.IN, DataType.VARCHAR, "$[yyyy-MM-dd+1]");
- globalParamList.add(testStartParam2Property);
- globalParamMap.put("testStartParam", "");
- globalParamMap.put("testStartParam2", "$[yyyy-MM-dd+1]");
-
- Map<String, String> startParamMap = new HashMap<>(2);
- startParamMap.put("testStartParam", "$[yyyyMMdd]");
-
- for (Map.Entry<String, String> param : globalParamMap.entrySet()) {
- String val = startParamMap.get(param.getKey());
- if (val != null) {
- param.setValue(val);
- }
- }
-
- String result6 = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, CommandType.START_CURRENT_TASK_PROCESS, scheduleTime, null);
- Assert.assertTrue(result6.contains("20191220"));
- }
-
/**
* Test handleEscapes
*/
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 023d871355..3f5a715d7c 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -104,6 +105,9 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
+ @Autowired
+ private CuringGlobalParamsService curingGlobalParamsService;
+
protected MasterSchedulerService() {
super("MasterCommandLoopThread");
}
@@ -183,7 +187,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
, nettyExecutorManager
, processAlertManager
, masterConfig
- , stateWheelExecuteThread);
+ , stateWheelExecuteThread
+ , curingGlobalParamsService);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
if (processInstance.getTimeout() > 0) {
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9f7643832b..9fa452c66e 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -235,6 +236,11 @@ public class WorkflowExecuteRunnable implements Runnable {
*/
private final StateWheelExecuteThread stateWheelExecuteThread;
+ /**
+ * curing global params service
+ */
+ private final CuringGlobalParamsService curingGlobalParamsService;
+
/**
* @param processInstance processInstance
* @param processService processService
@@ -248,13 +254,15 @@ public class WorkflowExecuteRunnable implements Runnable {
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
- , StateWheelExecuteThread stateWheelExecuteThread) {
+ , StateWheelExecuteThread stateWheelExecuteThread
+ , CuringGlobalParamsService curingGlobalParamsService) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.stateWheelExecuteThread = stateWheelExecuteThread;
+ this.curingGlobalParamsService = curingGlobalParamsService;
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
}
@@ -999,10 +1007,11 @@ public class WorkflowExecuteRunnable implements Runnable {
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
processInstance.setScheduleTime(complementListDate.get(0));
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
+ String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)));
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));
+ processInstance.setGlobalParams(globalParams);
processService.updateProcessInstance(processInstance);
}
}
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
index f2d263f699..8c7104ca30 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteTaskTest.java
@@ -26,6 +26,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@@ -86,6 +87,8 @@ public class WorkflowExecuteTaskTest {
private StateWheelExecuteThread stateWheelExecuteThread;
+ private CuringGlobalParamsService curingGlobalParamsService;
+
@Before
public void init() throws Exception {
applicationContext = mock(ApplicationContext.class);
@@ -113,7 +116,8 @@ public class WorkflowExecuteTaskTest {
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
- workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread));
+ curingGlobalParamsService = mock(CuringGlobalParamsService.class);
+ workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteRunnable(processInstance, processService, null, null, config, stateWheelExecuteThread, curingGlobalParamsService));
// prepareProcess init dag
Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag");
dag.setAccessible(true);
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index d03d165071..a8ae3f33a9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -276,6 +277,9 @@ public class ProcessServiceImpl implements ProcessService {
@Autowired
private K8sMapper k8sMapper;
+ @Autowired
+ private CuringGlobalParamsService curingGlobalParamsService;
+
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
@@ -802,11 +806,12 @@ public class ProcessServiceImpl implements ProcessService {
timezoneId = commandParamMap.get(Constants.SCHEDULE_TIMEZONE);
}
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime(), timezoneId));
+ String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime(), timezoneId);
+ processInstance.setGlobalParams(globalParams);
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
@@ -953,11 +958,12 @@ public class ProcessServiceImpl implements ProcessService {
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- processInstance.getScheduleTime(), timezoneId));
+ String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ commandTypeIfComplement,
+ processInstance.getScheduleTime(), timezoneId);
+ processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
}
//reset command parameter
@@ -1139,10 +1145,11 @@ public class ProcessServiceImpl implements ProcessService {
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
- processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId));
+ String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId);
+ processInstance.setGlobalParams(globalParams);
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 09ba19e911..0a859235ac 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -159,6 +160,9 @@ public class ProcessServiceTest {
@Mock
private ScheduleMapper scheduleMapper;
+ @Mock
+ CuringGlobalParamsService curingGlobalParamsService;
+
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
@@ -365,6 +369,11 @@ public class ProcessServiceTest {
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
+ Mockito.when(curingGlobalParamsService.curingGlobalParams(0,
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.START_PROCESS,
+ processInstance.getScheduleTime(), null)).thenReturn("\"testStartParam1\"");
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));