You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/06/30 12:03:57 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #10704: [Optimization] Global parameter and local parameter calculation external expansion.

caishunfeng commented on code in PR #10704:
URL: https://github.com/apache/dolphinscheduler/pull/10704#discussion_r910933942


##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {
+                String str = "";
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    str = timeFunctionExtension(processInstanceId, timezone, val);
+                } else {
+                    str = convertParameterPlaceholders(val, allParamMap);
+                }
+                resolveMap.put(entry.getKey(), str);
+            }
+        }
+        globalMap.putAll(resolveMap);
+        for (Property property : globalParamList) {
+            String val = globalMap.get(property.getProp());
+            if (val != null) {
+                property.setValue(val);
+            }
+        }
+        return JSONUtils.toJsonString(globalParamList);
+    }
+
+    @Override
+    public Map<String, Property> paramParsingPreparation(TaskExecutionContext taskExecutionContext, AbstractParameters parameters, ProcessInstance processInstance) {
+        Preconditions.checkNotNull(taskExecutionContext);
+        Preconditions.checkNotNull(parameters);
+        setGlobalParamsMap(taskExecutionContext);
+        Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+        Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
+        org.apache.dolphinscheduler.spi.enums.CommandType commandType = org.apache.dolphinscheduler.spi.enums.CommandType.of(taskExecutionContext.getCmdTypeIfComplement());

Review Comment:
   ```suggestion
           CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
   ```



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {

Review Comment:
   ```suggestion
   public class CuringGlobalParams implements CuringParamsService {
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParamUtils.java:
##########
@@ -43,77 +33,81 @@ public class ParamUtils {
     /**
      * parameter conversion
      * Warning:
-     *  When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
+     *  1.When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified.
      *  But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current
      *  situation is wrong. So I cannot modify the original logic.
      *
+     *  2.Change time: 2022-06-30
+     *  The purpose is for external expansion of local parameters.
+     *  now the method is replaced by the paramParsingPreparation() method of DolphinSchedulerCuringGlobalParams.
+     *
      * @param taskExecutionContext the context of this task instance
      * @param parameters the parameters
      * @return global params
      *
      */
-    public static Map<String, Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {
-        Preconditions.checkNotNull(taskExecutionContext);
-        Preconditions.checkNotNull(parameters);
-        Map<String, Property> globalParams = getUserDefParamsMap(taskExecutionContext.getDefinedParams());
-        Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
-        CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
-        Date scheduleTime = taskExecutionContext.getScheduleTime();
-
-        // combining local and global parameters
-        Map<String, Property> localParams = parameters.getInputLocalParametersMap();
-
-        //stream pass params
-        Map<String, Property> varParams = parameters.getVarPoolMap();
-
-        if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) {
-            return null;
-        }
-        // if it is a complement,
-        // you need to pass in the task instance id to locate the time
-        // of the process instance complement
-        Map<String,String> params = BusinessTimeUtils
-                .getBusinessTime(commandType,
-                        scheduleTime);
-
-        if (globalParamsMap != null) {
-
-            params.putAll(globalParamsMap);
-        }
-
-        if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
-            params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath());
-        }
-        params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId()));
-
-        if (varParams.size() != 0) {
-            globalParams.putAll(varParams);
-        }
-        if (localParams.size() != 0) {
-            globalParams.putAll(localParams);
-        }
-
-        Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry<String, Property> en = iter.next();
-            Property property = en.getValue();
-
-            if (StringUtils.isNotEmpty(property.getValue())
-                    && property.getValue().startsWith("$")) {
-                /**
-                 *  local parameter refers to global parameter with the same name
-                 *  note: the global parameters of the process instance here are solidified parameters,
-                 *  and there are no variables in them.
-                 */
-                String val = property.getValue();
-
-                val  = ParameterUtils.convertParameterPlaceholders(val, params);
-                property.setValue(val);
-            }
-        }
-
-        return globalParams;
-    }
+//    public static Map<String, Property> convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) {

Review Comment:
   remove if no use.



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {
+                String str = "";
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    str = timeFunctionExtension(processInstanceId, timezone, val);
+                } else {
+                    str = convertParameterPlaceholders(val, allParamMap);
+                }
+                resolveMap.put(entry.getKey(), str);
+            }
+        }
+        globalMap.putAll(resolveMap);
+        for (Property property : globalParamList) {
+            String val = globalMap.get(property.getProp());
+            if (val != null) {
+                property.setValue(val);
+            }
+        }
+        return JSONUtils.toJsonString(globalParamList);
+    }
+
+    @Override
+    public Map<String, Property> paramParsingPreparation(TaskExecutionContext taskExecutionContext, AbstractParameters parameters, ProcessInstance processInstance) {
+        Preconditions.checkNotNull(taskExecutionContext);
+        Preconditions.checkNotNull(parameters);
+        setGlobalParamsMap(taskExecutionContext);
+        Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+        Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
+        org.apache.dolphinscheduler.spi.enums.CommandType commandType = org.apache.dolphinscheduler.spi.enums.CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
+        Date scheduleTime = taskExecutionContext.getScheduleTime();
+
+        // combining local and global parameters
+        Map<String, Property> localParams = parameters.getInputLocalParametersMap();
+
+        //stream pass params
+        Map<String, Property> varParams = parameters.getVarPoolMap();
+
+        if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) {
+            return null;
+        }
+        // if it is a complement,
+        // you need to pass in the task instance id to locate the time
+        // of the process instance complement
+        Map<String,String> params = org.apache.dolphinscheduler.plugin.task.api.parser.BusinessTimeUtils
+                .getBusinessTime(commandType,
+                        scheduleTime);
+
+        if (globalParamsMap != null) {
+            params.putAll(globalParamsMap);
+        }
+
+        if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
+            params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath());
+        }
+        params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId()));
+
+        if (varParams.size() != 0) {
+            globalParams.putAll(varParams);
+        }
+        if (localParams.size() != 0) {
+            globalParams.putAll(localParams);
+        }
+
+        Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, Property> en = iter.next();
+            Property property = en.getValue();
+
+            if (StringUtils.isNotEmpty(property.getValue()) && property.getValue().startsWith("$")) {
+                /**
+                 *  local parameter refers to global parameter with the same name
+                 *  note: the global parameters of the process instance here are solidified parameters,
+                 *  and there are no variables in them.
+                 */
+                String val = property.getValue();
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+                    val = timeFunctionExtension(taskExecutionContext.getProcessInstanceId(), cmdParam.get(Constants.SCHEDULE_TIMEZONE), val);
+                } else {
+                    val  = org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils.convertParameterPlaceholders(val, params);

Review Comment:
   ```suggestion
                       val  = ParameterUtils.convertParameterPlaceholders(val, params);
   ```



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/ParameterUtils.java:
##########
@@ -155,54 +151,59 @@ public static void setInParameter(int index, PreparedStatement stmt, DataType da
     /**
      * curing user define parameters
      *
+     * Warning:
+     *      Change time: 2022-06-30
+     *      The purpose is for external expansion of local parameters.
+     *      now the method is replaced by the curingGlobalParams() method of DolphinSchedulerCuringGlobalParams.
+     *
      * @param globalParamMap  global param map
      * @param globalParamList global param list
      * @param commandType     command type
      * @param scheduleTime    schedule time
      * @return curing user define parameters
      */
-    public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,
-                                            CommandType commandType, Date scheduleTime) {
-
-        if (globalParamList == null || globalParamList.isEmpty()) {
-            return null;
-        }
-
-        Map<String, String> globalMap = new HashMap<>();
-        if (globalParamMap != null) {
-            globalMap.putAll(globalParamMap);
-        }
-        Map<String, String> allParamMap = new HashMap<>();
-        //If it is a complement, a complement time needs to be passed in, according to the task type
-        Map<String, String> timeParams = BusinessTimeUtils
-            .getBusinessTime(commandType, scheduleTime);
-
-        if (timeParams != null) {
-            allParamMap.putAll(timeParams);
-        }
-
-        allParamMap.putAll(globalMap);
-
-        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
-
-        Map<String, String> resolveMap = new HashMap<>();
-        for (Map.Entry<String, String> entry : entries) {
-            String val = entry.getValue();
-            if (val.startsWith("$")) {
-                String str = ParameterUtils.convertParameterPlaceholders(val, allParamMap);
-                resolveMap.put(entry.getKey(), str);
-            }
-        }
-        globalMap.putAll(resolveMap);
-
-        for (Property property : globalParamList) {
-            String val = globalMap.get(property.getProp());
-            if (val != null) {
-                property.setValue(val);
-            }
-        }
-        return JSONUtils.toJsonString(globalParamList);
-    }
+//    public static String curingGlobalParams(Map<String, String> globalParamMap, List<Property> globalParamList,

Review Comment:
   same here.



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {
+                String str = "";
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    str = timeFunctionExtension(processInstanceId, timezone, val);
+                } else {
+                    str = convertParameterPlaceholders(val, allParamMap);
+                }
+                resolveMap.put(entry.getKey(), str);
+            }
+        }
+        globalMap.putAll(resolveMap);
+        for (Property property : globalParamList) {
+            String val = globalMap.get(property.getProp());
+            if (val != null) {
+                property.setValue(val);
+            }
+        }
+        return JSONUtils.toJsonString(globalParamList);
+    }
+
+    @Override
+    public Map<String, Property> paramParsingPreparation(TaskExecutionContext taskExecutionContext, AbstractParameters parameters, ProcessInstance processInstance) {
+        Preconditions.checkNotNull(taskExecutionContext);
+        Preconditions.checkNotNull(parameters);
+        setGlobalParamsMap(taskExecutionContext);
+        Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+        Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
+        org.apache.dolphinscheduler.spi.enums.CommandType commandType = org.apache.dolphinscheduler.spi.enums.CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
+        Date scheduleTime = taskExecutionContext.getScheduleTime();
+
+        // combining local and global parameters
+        Map<String, Property> localParams = parameters.getInputLocalParametersMap();
+
+        //stream pass params
+        Map<String, Property> varParams = parameters.getVarPoolMap();
+
+        if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) {
+            return null;
+        }
+        // if it is a complement,
+        // you need to pass in the task instance id to locate the time
+        // of the process instance complement
+        Map<String,String> params = org.apache.dolphinscheduler.plugin.task.api.parser.BusinessTimeUtils
+                .getBusinessTime(commandType,
+                        scheduleTime);
+
+        if (globalParamsMap != null) {
+            params.putAll(globalParamsMap);
+        }
+
+        if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
+            params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath());
+        }
+        params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId()));
+
+        if (varParams.size() != 0) {
+            globalParams.putAll(varParams);
+        }
+        if (localParams.size() != 0) {
+            globalParams.putAll(localParams);
+        }
+
+        Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, Property> en = iter.next();
+            Property property = en.getValue();
+
+            if (StringUtils.isNotEmpty(property.getValue()) && property.getValue().startsWith("$")) {

Review Comment:
   same here



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {
+                String str = "";
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    str = timeFunctionExtension(processInstanceId, timezone, val);
+                } else {
+                    str = convertParameterPlaceholders(val, allParamMap);
+                }
+                resolveMap.put(entry.getKey(), str);
+            }
+        }
+        globalMap.putAll(resolveMap);
+        for (Property property : globalParamList) {
+            String val = globalMap.get(property.getProp());
+            if (val != null) {
+                property.setValue(val);
+            }
+        }
+        return JSONUtils.toJsonString(globalParamList);
+    }
+
+    @Override
+    public Map<String, Property> paramParsingPreparation(TaskExecutionContext taskExecutionContext, AbstractParameters parameters, ProcessInstance processInstance) {
+        Preconditions.checkNotNull(taskExecutionContext);
+        Preconditions.checkNotNull(parameters);
+        setGlobalParamsMap(taskExecutionContext);
+        Map<String, Property> globalParams = ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+        Map<String,String> globalParamsMap = taskExecutionContext.getDefinedParams();
+        org.apache.dolphinscheduler.spi.enums.CommandType commandType = org.apache.dolphinscheduler.spi.enums.CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
+        Date scheduleTime = taskExecutionContext.getScheduleTime();
+
+        // combining local and global parameters
+        Map<String, Property> localParams = parameters.getInputLocalParametersMap();
+
+        //stream pass params
+        Map<String, Property> varParams = parameters.getVarPoolMap();
+
+        if (globalParams.size() == 0 && localParams.size() == 0 && varParams.size() == 0) {
+            return null;
+        }
+        // if it is a complement,
+        // you need to pass in the task instance id to locate the time
+        // of the process instance complement
+        Map<String,String> params = org.apache.dolphinscheduler.plugin.task.api.parser.BusinessTimeUtils
+                .getBusinessTime(commandType,
+                        scheduleTime);
+
+        if (globalParamsMap != null) {
+            params.putAll(globalParamsMap);
+        }
+
+        if (StringUtils.isNotBlank(taskExecutionContext.getExecutePath())) {
+            params.put(PARAMETER_TASK_EXECUTE_PATH, taskExecutionContext.getExecutePath());
+        }
+        params.put(PARAMETER_TASK_INSTANCE_ID, Integer.toString(taskExecutionContext.getTaskInstanceId()));
+
+        if (varParams.size() != 0) {
+            globalParams.putAll(varParams);
+        }
+        if (localParams.size() != 0) {
+            globalParams.putAll(localParams);
+        }
+
+        Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, Property> en = iter.next();
+            Property property = en.getValue();
+
+            if (StringUtils.isNotEmpty(property.getValue()) && property.getValue().startsWith("$")) {
+                /**
+                 *  local parameter refers to global parameter with the same name
+                 *  note: the global parameters of the process instance here are solidified parameters,
+                 *  and there are no variables in them.
+                 */
+                String val = property.getValue();
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
+                    val = timeFunctionExtension(taskExecutionContext.getProcessInstanceId(), cmdParam.get(Constants.SCHEDULE_TIMEZONE), val);
+                } else {
+                    val  = org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils.convertParameterPlaceholders(val, params);
+                }
+                property.setValue(val);
+            }
+        }
+        if (org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils.isEmpty(globalParams)) {

Review Comment:
   ```suggestion
           if (MapUtils.isEmpty(globalParams)) {
   ```



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {
+                String str = "";
+                // whether external scaling calculation is required
+                if (timeFunctionNeedExpand(val)) {
+                    str = timeFunctionExtension(processInstanceId, timezone, val);
+                } else {
+                    str = convertParameterPlaceholders(val, allParamMap);
+                }
+                resolveMap.put(entry.getKey(), str);
+            }
+        }
+        globalMap.putAll(resolveMap);
+        for (Property property : globalParamList) {
+            String val = globalMap.get(property.getProp());
+            if (val != null) {
+                property.setValue(val);
+            }
+        }
+        return JSONUtils.toJsonString(globalParamList);
+    }
+
+    @Override
+    public Map<String, Property> paramParsingPreparation(TaskExecutionContext taskExecutionContext, AbstractParameters parameters, ProcessInstance processInstance) {

Review Comment:
   please add some comments.



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/DolphinSchedulerCuringGlobalParams.java:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.expand;
+
+import com.google.common.base.Preconditions;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
+import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_EXECUTE_PATH;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.PARAMETER_TASK_INSTANCE_ID;
+
+@Component
+public class DolphinSchedulerCuringGlobalParams implements CuringParamsService {
+
+    private static final Logger logger = LoggerFactory.getLogger(DolphinSchedulerCuringGlobalParams.class);
+
+    @Autowired
+    private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
+
+    @Override
+    public String convertParameterPlaceholders(String val, Map<String, String> allParamMap) {
+        return ParameterUtils.convertParameterPlaceholders(val, allParamMap);
+    }
+
+    @Override
+    public boolean timeFunctionNeedExpand(String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionNeedExpand(placeholderName);
+    }
+
+    @Override
+    public String timeFunctionExtension(Integer processInstanceId, String timezone, String placeholderName) {
+        return timePlaceholderResolverExpandService.timeFunctionExtension(processInstanceId, timezone, placeholderName);
+    }
+
+    @Override
+    public String curingGlobalParams(Integer processInstanceId, Map<String, String> globalParamMap, List<Property> globalParamList, CommandType commandType, Date scheduleTime, String timezone) {
+        if (globalParamList == null || globalParamList.isEmpty()) {
+            return null;
+        }
+        Map<String, String> globalMap = new HashMap<>();
+        if (globalParamMap != null) {
+            globalMap.putAll(globalParamMap);
+        }
+        Map<String, String> allParamMap = new HashMap<>();
+        //If it is a complement, a complement time needs to be passed in, according to the task type
+        Map<String, String> timeParams = BusinessTimeUtils.
+                getBusinessTime(commandType, scheduleTime, timezone);
+
+        if (timeParams != null) {
+            allParamMap.putAll(timeParams);
+        }
+        allParamMap.putAll(globalMap);
+        Set<Map.Entry<String, String>> entries = allParamMap.entrySet();
+        Map<String, String> resolveMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : entries) {
+            String val = entry.getValue();
+            if (val.startsWith("$")) {

Review Comment:
   use constant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org