You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/06/25 03:37:31 UTC

[dolphinscheduler] branch dev updated: [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6df58e8  [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)
6df58e8 is described below

commit 6df58e8d7830b09d42f7a6395780e53f292116cd
Author: wangxj3 <85...@qq.com>
AuthorDate: Fri Jun 25 11:37:23 2021 +0800

    [Feature-#5565][Master Worker-Server] Global Param passed by sense dependencies (#5603)
    
    * add globalParams new plan with varPool
    
    * add unit test
    
    * add python task varPoolParams
    
    
    Co-authored-by: wangxj <wangxj31>
---
 .../dolphinscheduler/common/enums/DataType.java    |   3 +-
 .../common/task/AbstractParameters.java            | 182 ++++++++++++++++-----
 .../common/task/shell/ShellParameters.java         |  60 +++----
 .../common/task/sql/SqlParameters.java             |  54 ++++++
 .../common/utils/VarPoolUtils.java                 |   3 +-
 .../common/task/EntityTestUtils.java               |   2 +-
 .../common/task/SqlParametersTest.java             |  30 +++-
 .../remote/command/TaskExecuteResponseCommand.java |  12 --
 .../builder/TaskExecutionContextBuilder.java       |  14 +-
 .../server/entity/TaskExecutionContext.java        |  13 ++
 .../master/processor/TaskResponseProcessor.java    |   3 +-
 .../master/processor/queue/TaskResponseEvent.java  |  15 +-
 .../processor/queue/TaskResponseService.java       |   3 +-
 .../server/master/runner/MasterExecThread.java     |  90 ++++++----
 .../dolphinscheduler/server/utils/ParamUtils.java  |   8 +-
 .../server/worker/runner/TaskExecuteThread.java    |   7 +-
 .../worker/task/AbstractCommandExecutor.java       |  13 --
 .../server/worker/task/AbstractTask.java           |  28 +---
 .../server/worker/task/datax/DataxTask.java        |   1 +
 .../server/worker/task/flink/FlinkTask.java        |   1 +
 .../server/worker/task/http/HttpTask.java          |   1 +
 .../server/worker/task/mr/MapReduceTask.java       |   1 +
 .../worker/task/procedure/ProcedureTask.java       |   1 +
 .../server/worker/task/python/PythonTask.java      |   3 +-
 .../server/worker/task/shell/ShellTask.java        |  17 +-
 .../server/worker/task/spark/SparkTask.java        |   1 +
 .../server/worker/task/sql/SqlTask.java            |  11 +-
 .../server/worker/task/sqoop/SqoopTask.java        |   1 +
 .../server/master/MasterExecThreadTest.java        |  48 ++++++
 .../dolphinscheduler/server/master/ParamsTest.java |  33 ++--
 .../processor/queue/TaskResponseServiceTest.java   |   3 +-
 .../server/utils/ParamUtilsTest.java               |  42 +++--
 .../worker/processor/TaskCallbackServiceTest.java  |   2 -
 .../worker/processor/TaskKillProcessorTest.java    |   6 +-
 .../worker/runner/TaskExecuteThreadTest.java       |   3 +-
 .../worker/task/AbstractCommandExecutorTest.java   |  53 ------
 .../server/worker/task/ShellTaskReturnTest.java    |  11 --
 .../server/worker/task/TaskManagerTest.java        |   5 -
 .../server/worker/task/TaskParamsTest.java         |  77 +++++++++
 .../server/worker/task/shell/ShellTaskTest.java    |  12 +-
 .../server/worker/task/sql/SqlTaskTest.java        |   2 +
 .../service/process/ProcessService.java            |  55 ++-----
 .../service/process/ProcessServiceTest.java        |  15 ++
 pom.xml                                            |   2 +-
 44 files changed, 591 insertions(+), 356 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
index eda00d8..2b0930d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DataType.java
@@ -30,6 +30,7 @@ public enum DataType {
      * 6 time, "HH:MM:SS"
      * 7 time stamp
      * 8 Boolean
+     * 9 list <String>
      */
-    VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN
+    VARCHAR,INTEGER,LONG,FLOAT,DOUBLE,DATE,TIME,TIMESTAMP,BOOLEAN,LIST
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
index 929516c..686642d 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/AbstractParameters.java
@@ -16,55 +16,163 @@
  */
 package org.apache.dolphinscheduler.common.task;
 
+import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
 /**
  * job params related class
  */
 public abstract class AbstractParameters implements IParameters {
 
-  @Override
-  public abstract boolean checkParameters();
-
-  @Override
-  public abstract List<ResourceInfo> getResourceFilesList();
-
-  /**
-   * local parameters
-   */
-  public List<Property> localParams;
-
-  /**
-   * get local parameters list
-   * @return Property list
-   */
-  public List<Property> getLocalParams() {
-    return localParams;
-  }
-
-  public void setLocalParams(List<Property> localParams) {
-    this.localParams = localParams;
-  }
-
-  /**
-   * get local parameters map
-   * @return parameters map
-   */
-  public Map<String,Property> getLocalParametersMap() {
-      if (localParams != null) {
-        Map<String,Property> localParametersMaps = new LinkedHashMap<>();
-
-        for (Property property : localParams) {
-          localParametersMaps.put(property.getProp(),property);
+    @Override
+    public abstract boolean checkParameters();
+
+    @Override
+    public abstract List<ResourceInfo> getResourceFilesList();
+
+    /**
+     * local parameters
+     */
+    public List<Property> localParams;
+
+    /**
+     * var pool
+     */
+    public List<Property> varPool;
+
+    /**
+     * get local parameters list
+     *
+     * @return Property list
+     */
+    public List<Property> getLocalParams() {
+        return localParams;
+    }
+
+    public void setLocalParams(List<Property> localParams) {
+        this.localParams = localParams;
+    }
+
+    /**
+     * get local parameters map
+     *
+     * @return parameters map
+     */
+    public Map<String, Property> getLocalParametersMap() {
+        if (localParams != null) {
+            Map<String, Property> localParametersMaps = new LinkedHashMap<>();
+
+            for (Property property : localParams) {
+                localParametersMaps.put(property.getProp(), property);
+            }
+            return localParametersMaps;
+        }
+        return null;
+    }
+
+    /**
+     * get varPool map
+     *
+     * @return parameters map
+     */
+    public Map<String, Property> getVarPoolMap() {
+        if (varPool != null) {
+            Map<String, Property> varPoolMap = new LinkedHashMap<>();
+            for (Property property : varPool) {
+                varPoolMap.put(property.getProp(), property);
+            }
+            return varPoolMap;
+        }
+        return null;
+    }
+
+    public List<Property> getVarPool() {
+        return varPool;
+    }
+
+    public void setVarPool(String varPool) {
+        if (StringUtils.isEmpty(varPool)) {
+            this.varPool = new ArrayList<>();
+        } else {
+            this.varPool = JSONUtils.toList(varPool, Property.class);
+        }
+    }
+
+    public void dealOutParam(String result) {
+        if (CollectionUtils.isEmpty(localParams)) {
+            return;
+        }
+        List<Property> outProperty = getOutProperty(localParams);
+        if (CollectionUtils.isEmpty(outProperty)) {
+            return;
+        }
+        if (StringUtils.isEmpty(result)) {
+            varPool.addAll(outProperty);
+            return;
+        }
+        Map<String, String> taskResult = getMapByString(result);
+        if (taskResult == null || taskResult.size() == 0) {
+            return;
+        }
+        for (Property info : outProperty) {
+            info.setValue(taskResult.get(info.getProp()));
+            varPool.add(info);
+        }
+    }
+
+    public List<Property> getOutProperty(List<Property> params) {
+        if (CollectionUtils.isEmpty(params)) {
+            return new ArrayList<>();
+        }
+        List<Property> result = new ArrayList<>();
+        for (Property info : params) {
+            if (info.getDirect() == Direct.OUT) {
+                result.add(info);
+            }
+        }
+        return result;
+    }
+
+    public List<Map<String, String>> getListMapByString(String json) {
+        List<Map<String, String>> allParams = new ArrayList<>();
+        ArrayNode paramsByJson = JSONUtils.parseArray(json);
+        Iterator<JsonNode> listIterator = paramsByJson.iterator();
+        while (listIterator.hasNext()) {
+            Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
+            allParams.add(param);
+        }
+        return allParams;
+    }
+
+    /**
+     * shell's result format is key=value$VarPool$key=value$VarPool$
+     * @param result
+     * @return
+     */
+    public static Map<String, String> getMapByString(String result) {
+        String[] formatResult = result.split("\\$VarPool\\$");
+        Map<String, String> format = new HashMap<>();
+        for (String info : formatResult) {
+            if (StringUtils.isNotEmpty(info) && info.contains("=")) {
+                String[] keyValue = info.split("=");
+                format.put(keyValue[0], keyValue[1]);
+            }
         }
-        return localParametersMaps;
-      }
-      return null;
-  }
+        return format;
+    }
 
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
index e11e596..7388cd3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/shell/ShellParameters.java
@@ -14,52 +14,52 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.common.task.shell;
 
+package org.apache.dolphinscheduler.common.task.shell;
 
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * shell parameters
  */
 public class ShellParameters extends AbstractParameters {
-  /**
-   * shell script
-   */
-  private String rawScript;
+    /**
+     * shell script
+     */
+    private String rawScript;
+
+    /**
+     * resource list
+     */
+    private List<ResourceInfo> resourceList;
 
-  /**
-   * resource list
-   */
-  private List<ResourceInfo> resourceList;
+    public String getRawScript() {
+        return rawScript;
+    }
 
-  public String getRawScript() {
-    return rawScript;
-  }
+    public void setRawScript(String rawScript) {
+        this.rawScript = rawScript;
+    }
 
-  public void setRawScript(String rawScript) {
-    this.rawScript = rawScript;
-  }
+    public List<ResourceInfo> getResourceList() {
+        return resourceList;
+    }
 
-  public List<ResourceInfo> getResourceList() {
-    return resourceList;
-  }
+    public void setResourceList(List<ResourceInfo> resourceList) {
+        this.resourceList = resourceList;
+    }
 
-  public void setResourceList(List<ResourceInfo> resourceList) {
-    this.resourceList = resourceList;
-  }
+    @Override
+    public boolean checkParameters() {
+        return rawScript != null && !rawScript.isEmpty();
+    }
 
-  @Override
-  public boolean checkParameters() {
-    return rawScript != null && !rawScript.isEmpty();
-  }
+    @Override
+    public List<ResourceInfo> getResourceFilesList() {
+        return resourceList;
+    }
 
-  @Override
-  public List<ResourceInfo> getResourceFilesList() {
-    return resourceList;
-  }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
index 86989e1..59259a5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
@@ -17,12 +17,19 @@
 
 package org.apache.dolphinscheduler.common.task.sql;
 
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Sql/Hql parameter
@@ -219,6 +226,53 @@ public class SqlParameters extends AbstractParameters {
     }
 
     @Override
+    public void dealOutParam(String result) {
+        if (CollectionUtils.isEmpty(localParams)) {
+            return;
+        }
+        List<Property> outProperty = getOutProperty(localParams);
+        if (CollectionUtils.isEmpty(outProperty)) {
+            return;
+        }
+        if (StringUtils.isEmpty(result)) {
+            varPool.addAll(outProperty);
+            return;
+        }
+        List<Map<String, String>> sqlResult = getListMapByString(result);
+        if (CollectionUtils.isEmpty(sqlResult)) {
+            return;
+        }
+        //if sql return more than one line
+        if (sqlResult.size() > 1) {
+            Map<String, List<String>> sqlResultFormat = new HashMap<>();
+            //init sqlResultFormat
+            Set<String> keySet = sqlResult.get(0).keySet();
+            for (String key : keySet) {
+                sqlResultFormat.put(key, new ArrayList<>());
+            }
+            for (Map<String, String> info : sqlResult) {
+                for (String key : info.keySet()) {
+                    sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
+                }
+            }
+            for (Property info : outProperty) {
+                if (info.getType() == DataType.LIST) {
+                    info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
+                    varPool.add(info);
+                }
+            }
+        } else {
+            //result only one line
+            Map<String, String> firstRow = sqlResult.get(0);
+            for (Property info : outProperty) {
+                info.setValue(String.valueOf(firstRow.get(info.getProp())));
+                varPool.add(info);
+            }
+        }
+
+    }
+
+    @Override
     public String toString() {
         return "SqlParameters{"
                 + "type='" + type + '\''
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
index cd300e3..f286300 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
@@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
 
 import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
 public class VarPoolUtils {
@@ -71,7 +70,7 @@ public class VarPoolUtils {
             if (kvs.length == 2) {
                 propToValue.put(kvs[0], kvs[1]);
             } else {
-                throw new ParseException(kv, 2);
+                return;
             }
         }
     }
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
index 5d867bc..8e9b451 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java
@@ -32,7 +32,7 @@ public class EntityTestUtils {
 
     static {
         OBJECT_MAP.put("java.lang.Long", 1L);
-        OBJECT_MAP.put("java.lang.String", "test");
+        OBJECT_MAP.put("java.lang.String", "[{\"direct\":\"OUT\",\"prop\":\"percentage5\",\"type\":\"VARCHAR\",\"value\":\"qwe\"}]");
         OBJECT_MAP.put("java.lang.Integer", 1);
         OBJECT_MAP.put("int", 1);
         OBJECT_MAP.put("long", 1L);
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
index 57b0d60..3cd3f46 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqlParametersTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.dolphinscheduler.common.task;
 
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,6 +47,14 @@ public class SqlParametersTest {
 
     @Test
     public void testSqlParameters() {
+        List<Property> properties = new ArrayList<>();
+        Property property = new Property();
+        property.setProp("test1");
+        property.setDirect(Direct.OUT);
+        property.setType(DataType.VARCHAR);
+        property.setValue("test1");
+        properties.add(property);
+
         SqlParameters sqlParameters = new SqlParameters();
         Assert.assertTrue(CollectionUtils.isEmpty(sqlParameters.getResourceFilesList()));
 
@@ -66,6 +82,18 @@ public class SqlParametersTest {
         Assert.assertEquals(groupId, sqlParameters.getGroupId());
         Assert.assertEquals(limit, sqlParameters.getLimit());
 
-        Assert.assertTrue(sqlParameters.checkParameters());
+        String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
+        String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
+        sqlParameters.setLocalParams(properties);
+        sqlParameters.varPool = new ArrayList<>();
+        sqlParameters.dealOutParam(sqlResult1);
+        assertNotNull(sqlParameters.getVarPool().get(0));
+
+        property.setType(DataType.LIST);
+        properties.clear();
+        properties.add(property);
+        sqlParameters.setLocalParams(properties);
+        sqlParameters.dealOutParam(sqlResult);
+        assertNotNull(sqlParameters.getVarPool().get(0));
     }
 }
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index 93cc3ea..de5b82c 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -68,10 +68,6 @@ public class TaskExecuteResponseCommand implements Serializable {
      * varPool string
      */
     private String varPool;
-    /**
-     * task return result
-     */
-    private String result;
 
     public void setVarPool(String varPool) {
         this.varPool = varPool;
@@ -143,12 +139,4 @@ public class TaskExecuteResponseCommand implements Serializable {
                 + ", appIds='" + appIds + '\''
                 + '}';
     }
-
-    public String getResult() {
-        return result;
-    }
-
-    public void setResult(String result) {
-        this.result = result;
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index da46e4d..c1cca3a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -21,8 +21,15 @@ import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UN
 
 import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
 import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.server.entity.*;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 /**
  *  TaskExecutionContext builder
@@ -41,7 +48,7 @@ public class TaskExecutionContextBuilder {
      * @param taskInstance taskInstance
      * @return TaskExecutionContextBuilder
      */
-    public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance){
+    public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance taskInstance) {
         taskExecutionContext.setTaskInstanceId(taskInstance.getId());
         taskExecutionContext.setTaskName(taskInstance.getName());
         taskExecutionContext.setFirstSubmitTime(taskInstance.getFirstSubmitTime());
@@ -52,6 +59,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setHost(taskInstance.getHost());
         taskExecutionContext.setResources(taskInstance.getResources());
         taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
+        taskExecutionContext.setVarPool(taskInstance.getVarPool());
         return this;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 8490849..7a47107 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -217,6 +217,11 @@ public class TaskExecutionContext implements Serializable {
     private SqoopTaskExecutionContext sqoopTaskExecutionContext;
 
     /**
+     * taskInstance varPool
+     */
+    private String varPool;
+
+    /**
      * procedure TaskExecutionContext
      */
     private ProcedureTaskExecutionContext procedureTaskExecutionContext;
@@ -556,4 +561,12 @@ public class TaskExecutionContext implements Serializable {
                 + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext
                 + '}';
     }
+
+    public String getVarPool() {
+        return varPool;
+    }
+
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 186c4f3..c307b2c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -80,8 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
                 responseCommand.getAppIds(),
                 responseCommand.getTaskInstanceId(),
                 responseCommand.getVarPool(),
-                channel,
-                responseCommand.getResult()
+                channel
                 );
         taskResponseService.addResponse(taskResponseEvent);
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 9789bcc..05466e8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -92,10 +92,6 @@ public class TaskResponseEvent {
      * channel
      */
     private Channel channel;
-    /**
-     * task return result
-     */
-    private String result;
     
     public static TaskResponseEvent newAck(ExecutionStatus state,
                                            Date startTime,
@@ -122,8 +118,7 @@ public class TaskResponseEvent {
                                               String appIds,
                                               int taskInstanceId,
                                               String varPool,
-                                              Channel channel,
-                                              String result) {
+                                              Channel channel) {
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setEndTime(endTime);
@@ -133,7 +128,6 @@ public class TaskResponseEvent {
         event.setEvent(Event.RESULT);
         event.setVarPool(varPool);
         event.setChannel(channel);
-        event.setResult(result);
         return event;
     }
 
@@ -233,11 +227,4 @@ public class TaskResponseEvent {
         this.channel = channel;
     }
 
-    public String getResult() {
-        return result;
-    }
-
-    public void setResult(String result) {
-        this.result = result;
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index f3f2e7f..1b5eddb 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -165,8 +165,7 @@ public class TaskResponseService {
                             taskResponseEvent.getProcessId(),
                             taskResponseEvent.getAppIds(),
                             taskResponseEvent.getTaskInstanceId(),
-                            taskResponseEvent.getVarPool(),
-                                taskResponseEvent.getResult()
+                            taskResponseEvent.getVarPool()
                         );
                     }
                     // if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 14dcfee..c18ed78 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -22,7 +22,6 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
 import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
 import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
 import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -47,7 +46,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProjectUser;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
@@ -60,7 +58,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
-import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -470,8 +467,6 @@ public class MasterExecThread implements Runnable {
      * @return TaskInstance
      */
     private TaskInstance createTaskInstance(ProcessInstance processInstance, TaskNode taskNode) {
-        //update processInstance for update the globalParams
-        this.processInstance = this.processService.findProcessInstanceById(this.processInstance.getId());
         TaskInstance taskInstance = findTaskIfExists(taskNode.getCode(), taskNode.getVersion());
         if (taskInstance == null) {
             taskInstance = new TaskInstance();
@@ -503,6 +498,9 @@ public class MasterExecThread implements Runnable {
             // retry task instance interval
             taskInstance.setRetryInterval(taskNode.getRetryInterval());
 
+            //set task param
+            taskInstance.setTaskParams(taskNode.getTaskParams());
+
             // task instance priority
             if (taskNode.getTaskInstancePriority() == null) {
                 taskInstance.setTaskInstancePriority(Priority.MEDIUM);
@@ -518,54 +516,74 @@ public class MasterExecThread implements Runnable {
             } else {
                 taskInstance.setWorkerGroup(taskWorkerGroup);
             }
-            taskInstance.setTaskParams(globalParamToTaskParams(taskNode.getTaskParams()));
             // delay execution time
             taskInstance.setDelayTime(taskNode.getDelayTime());
         }
+
+        //get pre task ,get all the task varPool to this task
+        Set<String> preTask =  dag.getPreviousNodes(taskInstance.getName());
+        getPreVarPool(taskInstance, preTask);
         return taskInstance;
     }
 
-    private String globalParamToTaskParams(String params) {
-        String globalParams = this.processInstance.getGlobalParams();
-        if (StringUtils.isBlank(globalParams)) {
-            return params;
-        }
-        Map<String, String> globalMap = processService.getGlobalParamMap(globalParams);
-        if (globalMap == null || globalMap.size() == 0) {
-            return params;
-        }
-        // the process global param save in localParams
-        Map<String, Object> result = JSONUtils.toMap(params, String.class, Object.class);
-        Object localParams = result.get(LOCAL_PARAMS);
-        if (localParams != null) {
-            List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
-            for (Property info : allParam) {
-                String paramName = info.getProp();
-                if (StringUtils.isNotEmpty(paramName) && propToValue.containsKey(paramName)) {
-                    info.setValue((String) propToValue.get(paramName));
+    public void getPreVarPool(TaskInstance taskInstance,  Set<String> preTask) {
+        Map<String,Property> allProperty = new HashMap<>();
+        Map<String,TaskInstance> allTaskInstance = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(preTask)) {
+            for (String preTaskName : preTask) {
+                TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
+                if (preTaskInstance == null) {
+                    continue;
                 }
-                if (info.getDirect().equals(Direct.IN)) {
-                    String value = globalMap.get(paramName);
-                    if (StringUtils.isNotEmpty(value)) {
-                        info.setValue(value);
+                String preVarPool = preTaskInstance.getVarPool();
+                if (StringUtils.isNotEmpty(preVarPool)) {
+                    List<Property> properties = JSONUtils.toList(preVarPool, Property.class);
+                    for (Property info : properties) {
+                        setVarPoolValue(allProperty, allTaskInstance, preTaskInstance, info);
                     }
                 }
             }
-            result.put(LOCAL_PARAMS, allParam);
+            if (allProperty.size() > 0) {
+                taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values()));
+            }
+        }
+    }
+
+    private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
+        //for this taskInstance all the param in this part is IN.
+        thisProperty.setDirect(Direct.IN);
+        //get the pre taskInstance Property's name
+        String proName = thisProperty.getProp();
+        //if the Previous nodes have the Property of same name
+        if (allProperty.containsKey(proName)) {
+            //comparison the value of two Property
+            Property otherPro = allProperty.get(proName);
+            //if this property'value of loop is empty,use the other,whether the other's value is empty or not
+            if (StringUtils.isEmpty(thisProperty.getValue())) {
+                allProperty.put(proName, otherPro);
+                //if  property'value of loop is not empty,and the other's value is not empty too, use the earlier value
+            } else if (StringUtils.isNotEmpty(otherPro.getValue())) {
+                TaskInstance otherTask = allTaskInstance.get(proName);
+                if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
+                    allProperty.put(proName, thisProperty);
+                    allTaskInstance.put(proName,preTaskInstance);
+                } else {
+                    allProperty.put(proName, otherPro);
+                }
+            } else {
+                allProperty.put(proName, thisProperty);
+                allTaskInstance.put(proName,preTaskInstance);
+            }
+        } else {
+            allProperty.put(proName, thisProperty);
+            allTaskInstance.put(proName,preTaskInstance);
         }
-        return JSONUtils.toJsonString(result);
     }
 
     private void submitPostNode(String parentNodeName) {
         Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
         List<TaskInstance> taskInstances = new ArrayList<>();
         for (String taskNode : submitTaskNodeList) {
-            try {
-                VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
-            } catch (ParseException e) {
-                logger.error("parse {} exception", processInstance.getVarPool(), e);
-                throw new RuntimeException();
-            }
             TaskNode taskNodeObject = dag.getNode(taskNode);
             taskInstances.add(createTaskInstance(processInstance, taskNodeObject));
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index 875c69c..a49d915 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -47,6 +47,7 @@ public class ParamUtils {
     public static Map<String,Property> convert(Map<String,Property> globalParams,
                                                            Map<String,String> globalParamsMap,
                                                            Map<String,Property> localParams,
+                                                           Map<String,Property> varParams,
                                                            CommandType commandType,
                                                            Date scheduleTime) {
         if (globalParams == null && localParams == null) {
@@ -64,10 +65,15 @@ public class ParamUtils {
         }
 
         if (globalParams != null && localParams != null) {
-            globalParams.putAll(localParams);
+            localParams.putAll(globalParams);
+            globalParams = localParams;
         } else if (globalParams == null && localParams != null) {
             globalParams = localParams;
         }
+        if (varParams != null) {
+            varParams.putAll(globalParams);
+            globalParams = varParams;
+        }
         Iterator<Map.Entry<String, Property>> iter = globalParams.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<String, Property> en = iter.next();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 6fd4f34..50847f7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -151,10 +151,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
                     taskExecutionContext.getTaskInstanceId()));
 
             task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
-
             // task init
             task.init();
-
+            //init varPool
+            task.getParameters().setVarPool(taskExecutionContext.getVarPool());
             // task handle
             task.handle();
 
@@ -165,8 +165,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
             responseCommand.setEndTime(new Date());
             responseCommand.setProcessId(task.getProcessId());
             responseCommand.setAppIds(task.getAppIds());
-            responseCommand.setVarPool(task.getVarPool());
-            responseCommand.setResult(task.getResultString());
+            responseCommand.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
             logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
         } catch (Exception e) {
             logger.error("task scheduler failure", e);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 5241f5f..3ea7bd2 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -89,11 +89,6 @@ public abstract class AbstractCommandExecutor {
     protected boolean logOutputIsScuccess = false;
 
     /**
-     * SHELL result string
-     */
-    protected String taskResultString;
-
-    /**
      * taskExecutionContext
      */
     protected TaskExecutionContext taskExecutionContext;
@@ -365,7 +360,6 @@ public abstract class AbstractCommandExecutor {
                         varPool.append("$VarPool$");
                     } else {
                         logBuffer.add(line);
-                        taskResultString = line;
                     }
                 }
             } catch (Exception e) {
@@ -593,11 +587,4 @@ public abstract class AbstractCommandExecutor {
 
     protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
 
-    public String getTaskResultString() {
-        return taskResultString;
-    }
-
-    public void setTaskResultString(String taskResultString) {
-        this.taskResultString = taskResultString;
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 45b94d2..81b8097 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -36,11 +36,6 @@ import org.slf4j.Logger;
 public abstract class AbstractTask {
 
     /**
-     * varPool string
-     */
-    protected String varPool;
-
-    /**
      * taskExecutionContext
      **/
     TaskExecutionContext taskExecutionContext;
@@ -57,11 +52,6 @@ public abstract class AbstractTask {
     protected int processId;
 
     /**
-     * SHELL result string
-     */
-    protected String resultString;
-
-    /**
      * other resource manager appId , for example : YARN etc
      */
     protected String appIds;
@@ -81,7 +71,7 @@ public abstract class AbstractTask {
      * constructor
      *
      * @param taskExecutionContext taskExecutionContext
-     * @param logger logger
+     * @param logger               logger
      */
     protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) {
         this.taskExecutionContext = taskExecutionContext;
@@ -139,14 +129,6 @@ public abstract class AbstractTask {
         }
     }
 
-    public void setVarPool(String varPool) {
-        this.varPool = varPool;
-    }
-
-    public String getVarPool() {
-        return varPool;
-    }
-
     /**
      * get exit status code
      *
@@ -176,14 +158,6 @@ public abstract class AbstractTask {
         this.processId = processId;
     }
 
-    public String getResultString() {
-        return resultString;
-    }
-
-    public void setResultString(String resultString) {
-        this.resultString = resultString;
-    }
-
     /**
      * get task parameters
      *
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index a8aa132..b785cb5 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -158,6 +158,7 @@ public class DataxTask extends AbstractTask {
             Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                     taskExecutionContext.getDefinedParams(),
                     dataXParameters.getLocalParametersMap(),
+                    dataXParameters.getVarPoolMap(),
                     CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                     taskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
index 4d34190..27e5b42 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java
@@ -84,6 +84,7 @@ public class FlinkTask extends AbstractYarnTask {
             Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                     taskExecutionContext.getDefinedParams(),
                     flinkParameters.getLocalParametersMap(),
+                    flinkParameters.getVarPoolMap(),
                     CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                     taskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
index 87adaab..7c68bc1c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
@@ -135,6 +135,7 @@ public class HttpTask extends AbstractTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                 taskExecutionContext.getDefinedParams(),
                 httpParameters.getLocalParametersMap(),
+                httpParameters.getVarPoolMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
         List<HttpProperty> httpPropertyList = new ArrayList<>();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
index f60b1cb..ce908df 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java
@@ -88,6 +88,7 @@ public class MapReduceTask extends AbstractYarnTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                 taskExecutionContext.getDefinedParams(),
                 mapreduceParameters.getLocalParametersMap(),
+                mapreduceParameters.getVarPoolMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
index 2166b1f..3748c7a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java
@@ -122,6 +122,7 @@ public class ProcedureTask extends AbstractTask {
             Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                     taskExecutionContext.getDefinedParams(),
                     procedureParameters.getLocalParametersMap(),
+                    procedureParameters.getVarPoolMap(),
                     CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                     taskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 6e561c1..e784a79 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -92,7 +92,7 @@ public class PythonTask extends AbstractTask {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(commandExecuteResult.getAppIds());
             setProcessId(commandExecuteResult.getProcessId());
-            setVarPool(pythonCommandExecutor.getVarPool());
+            pythonParameters.dealOutParam(pythonCommandExecutor.getVarPool());
         }
         catch (Exception e) {
             logger.error("python task failure", e);
@@ -119,6 +119,7 @@ public class PythonTask extends AbstractTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                         taskExecutionContext.getDefinedParams(),
                         pythonParameters.getLocalParametersMap(),
+                        pythonParameters.getVarPoolMap(),
                         CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                         taskExecutionContext.getScheduleTime());
         
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 85f8ea0..e193571 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -105,8 +105,7 @@ public class ShellTask extends AbstractTask {
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(commandExecuteResult.getAppIds());
             setProcessId(commandExecuteResult.getProcessId());
-            setResult(shellCommandExecutor.getTaskResultString());
-            setVarPool(shellCommandExecutor.getVarPool());
+            shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
         } catch (Exception e) {
             logger.error("shell task error", e);
             setExitStatusCode(Constants.EXIT_CODE_FAILURE);
@@ -169,6 +168,7 @@ public class ShellTask extends AbstractTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
             taskExecutionContext.getDefinedParams(),
             shellParameters.getLocalParametersMap(),
+            shellParameters.getVarPoolMap(),
             CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
             taskExecutionContext.getScheduleTime());
         // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
@@ -188,17 +188,4 @@ public class ShellTask extends AbstractTask {
         }
         return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
     }
-
-    public void setResult(String result) {
-        Map<String, Property> localParams = shellParameters.getLocalParametersMap();
-        List<Map<String, String>> outProperties = new ArrayList<>();
-        Map<String, String> p = new HashMap<>();
-        localParams.forEach((k,v) -> {
-            if (v.getDirect() == Direct.OUT) {
-                p.put(k, result);
-            }
-        });
-        outProperties.add(p);
-        resultString = JSONUtils.toJsonString(outProperties);
-    }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
index f6fec0f..a5a641c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java
@@ -113,6 +113,7 @@ public class SparkTask extends AbstractYarnTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
             taskExecutionContext.getDefinedParams(),
             sparkParameters.getLocalParametersMap(),
+            sparkParameters.getVarPoolMap(),
             CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
             taskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 0716645..b174734 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -111,8 +111,7 @@ public class SqlTask extends AbstractTask {
         Thread.currentThread().setName(threadLoggerInfoName);
 
         logger.info("Full sql parameters: {}", sqlParameters);
-        logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {}, udfs : {},showType : {}, "
-                        + "connParams : {}, query max result limit : {}",
+        logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
                 sqlParameters.getType(),
                 sqlParameters.getDatasource(),
                 sqlParameters.getSql(),
@@ -120,6 +119,7 @@ public class SqlTask extends AbstractTask {
                 sqlParameters.getUdfs(),
                 sqlParameters.getShowType(),
                 sqlParameters.getConnParams(),
+                sqlParameters.getVarPool(),
                 sqlParameters.getLimit());
         try {
             SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
@@ -171,6 +171,7 @@ public class SqlTask extends AbstractTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
                 taskExecutionContext.getDefinedParams(),
                 sqlParameters.getLocalParametersMap(),
+                sqlParameters.getVarPoolMap(),
                 CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
                 taskExecutionContext.getScheduleTime());
 
@@ -264,10 +265,9 @@ public class SqlTask extends AbstractTask {
                 String updateResult = String.valueOf(stmt.executeUpdate());
                 result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
             }
-
+            //deal out params
+            sqlParameters.dealOutParam(result);
             postSql(connection, postStatementsBinds);
-            this.setResultString(result);
-
         } catch (Exception e) {
             logger.error("execute sql error: {}", e.getMessage());
             throw e;
@@ -276,6 +276,7 @@ public class SqlTask extends AbstractTask {
         }
     }
 
+
     public String setNonQuerySqlReturn(String updateResult, List<Property> properties) {
         String result = null;
         for (Property info :properties) {
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 00d94f0..1d1b32d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -76,6 +76,7 @@ public class SqoopTask extends AbstractYarnTask {
         Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
             sqoopTaskExecutionContext.getDefinedParams(),
             sqoopParameters.getLocalParametersMap(),
+            sqoopParameters.getVarPoolMap(),
             CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
             sqoopTaskExecutionContext.getScheduleTime());
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index a42f187..fbc4ed8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -44,10 +44,14 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.text.ParseException;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -207,6 +211,50 @@ public class MasterExecThreadTest {
         }
     }
 
+    @Test
+    public void testGetPreVarPool() {
+        try {
+            Set<String> preTaskName = new HashSet<>();
+            preTaskName.add("test1");
+            preTaskName.add("test2");
+            Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
+
+            TaskInstance taskInstance = new TaskInstance();
+
+            TaskInstance taskInstance1 = new TaskInstance();
+            taskInstance1.setId(1);
+            taskInstance1.setName("test1");
+            taskInstance1.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"1\"}]");
+            taskInstance1.setEndTime(new Date());
+
+            TaskInstance taskInstance2 = new TaskInstance();
+            taskInstance2.setId(2);
+            taskInstance2.setName("test2");
+            taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test2\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
+            taskInstance2.setEndTime(new Date());
+
+            completeTaskList.put("test1", taskInstance1);
+            completeTaskList.put("test2", taskInstance2);
+
+            Class<MasterExecThread> masterExecThreadClass = MasterExecThread.class;
+
+            Field field = masterExecThreadClass.getDeclaredField("completeTaskList");
+            field.setAccessible(true);
+            field.set(masterExecThread, completeTaskList);
+
+            masterExecThread.getPreVarPool(taskInstance, preTaskName);
+            Assert.assertNotNull(taskInstance.getVarPool());
+            taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
+            completeTaskList.put("test2", taskInstance2);
+            field.setAccessible(true);
+            field.set(masterExecThread, completeTaskList);
+            masterExecThread.getPreVarPool(taskInstance, preTaskName);
+            Assert.assertNotNull(taskInstance.getVarPool());
+        } catch (Exception e) {
+            Assert.fail();
+        }
+    }
+
     private List<Schedule> zeroSchedulerList() {
         return Collections.EMPTY_LIST;
     }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
index 48b34d5..12613c6 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java
@@ -20,20 +20,20 @@ import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
-import org.apache.dolphinscheduler.common.utils.*;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  *  user define param
@@ -73,20 +73,19 @@ public class ParamsTest {
     }
 
     @Test
-    public void convertTest()throws Exception{
-        Map<String,Property> globalParams = new HashMap<>();
+    public void convertTest() throws Exception {
+        Map<String, Property> globalParams = new HashMap<>();
         Property property = new Property();
         property.setProp("global_param");
         property.setDirect(Direct.IN);
         property.setType(DataType.VARCHAR);
         property.setValue("${system.biz.date}");
-        globalParams.put("global_param",property);
+        globalParams.put("global_param", property);
 
-        Map<String,String> globalParamsMap = new HashMap<>();
-        globalParamsMap.put("global_param","${system.biz.date}");
+        Map<String, String> globalParamsMap = new HashMap<>();
+        globalParamsMap.put("global_param", "${system.biz.date}");
 
-
-        Map<String,Property> localParams = new HashMap<>();
+        Map<String, Property> localParams = new HashMap<>();
         Property localProperty = new Property();
         localProperty.setProp("local_param");
         localProperty.setDirect(Direct.IN);
@@ -94,8 +93,16 @@ public class ParamsTest {
         localProperty.setValue("${global_param}");
         localParams.put("local_param", localProperty);
 
+        Map<String, Property> varPoolParams = new HashMap<>();
+        Property varProperty = new Property();
+        varProperty.setProp("local_param");
+        varProperty.setDirect(Direct.IN);
+        varProperty.setType(DataType.VARCHAR);
+        varProperty.setValue("${global_param}");
+        varPoolParams.put("varPool", varProperty);
+
         Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap,
-                localParams, CommandType.START_PROCESS, new Date());
+                localParams,varPoolParams, CommandType.START_PROCESS, new Date());
         logger.info(JSONUtils.toJsonString(paramsMap));
 
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index ec0807c..5d10f84 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -70,8 +70,7 @@ public class TaskResponseServiceTest {
             "ids",
             22,
             "varPol",
-            channel,
-                "[{\"id\":70000,\"database_name\":\"yuul\",\"status\":-1,\"create_time\":1601202829000,\"update_time\":1601202829000,\"table_name3\":\"\",\"table_name4\":\"\"}]");
+            channel);
 
         taskInstance = new TaskInstance();
         taskInstance.setId(22);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
index 220cce5..a9a1b89 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
@@ -17,23 +17,24 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.process.Property;
-import org.apache.dolphinscheduler.common.utils.*;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test ParamUtils
@@ -49,8 +50,11 @@ public class ParamUtilsTest {
 
     public Map<String, Property> localParams = new HashMap<>();
 
+    public Map<String, Property> varPoolParams = new HashMap<>();
+
     /**
      * Init params
+     *
      * @throws Exception
      */
     @Before
@@ -71,6 +75,14 @@ public class ParamUtilsTest {
         localProperty.setType(DataType.VARCHAR);
         localProperty.setValue("${global_param}");
         localParams.put("local_param", localProperty);
+
+        Property varProperty = new Property();
+        varProperty.setProp("local_param");
+        varProperty.setDirect(Direct.IN);
+        varProperty.setType(DataType.VARCHAR);
+        varProperty.setValue("${global_param}");
+        varPoolParams.put("varPool", varProperty);
+
     }
 
     /**
@@ -80,16 +92,20 @@ public class ParamUtilsTest {
     public void testConvert() {
 
         //The expected value
-        String expected = "{\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
+        String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+                + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+                + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
         //The expected value when globalParams is null but localParams is not null
-        String expected1 = "{\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
+        String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+                + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+                + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}";
         //Define expected date , the month is 0-base
         Calendar calendar = Calendar.getInstance();
-        calendar.set(2019,11,30);
+        calendar.set(2019, 11, 30);
         Date date = calendar.getTime();
 
         //Invoke convert
-        Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, CommandType.START_PROCESS, date);
+        Map<String, Property> paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date);
         String result = JSONUtils.toJsonString(paramsMap);
         assertEquals(expected, result);
 
@@ -101,12 +117,12 @@ public class ParamUtilsTest {
         }
 
         //Invoke convert with null globalParams
-        Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams, CommandType.START_PROCESS, date);
+        Map<String, Property> paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date);
         String result1 = JSONUtils.toJsonString(paramsMap1);
         assertEquals(expected1, result1);
 
         //Null check, invoke convert with null globalParams and null localParams
-        Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, CommandType.START_PROCESS, date);
+        Map<String, Property> paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date);
         assertNull(paramsMap2);
     }
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index c3f6478..53c60d7 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -67,8 +67,6 @@ public class TaskCallbackServiceTest {
         taskCallbackService.sendAck(1, ackCommand.convert2Command());
 
         TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
-        String result = responseCommand.getResult();
-        responseCommand.setResult("return string");
         taskCallbackService.sendResult(1, responseCommand.convert2Command());
 
         Stopper.stop();
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
index 36a758a..25fa22a 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessorTest.java
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 
@@ -53,6 +54,8 @@ public class TaskKillProcessorTest {
 
     private TaskKillProcessor taskKillProcessor;
 
+    private WorkerManagerThread workerManager;
+
     private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
 
     private Channel channel;
@@ -85,6 +88,8 @@ public class TaskKillProcessorTest {
         PowerMockito.mockStatic(LoggerUtils.class);
         PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class)).thenReturn(taskCallbackService);
         PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
+        WorkerManagerThread workerManager = PowerMockito.mock(WorkerManagerThread.class);
+        PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
         PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
         PowerMockito.doNothing().when(taskCallbackService).addRemoteChannel(anyInt(), any());
         PowerMockito.whenNew(NettyRemoteChannel.class).withAnyArguments().thenReturn(null);
@@ -102,7 +107,6 @@ public class TaskKillProcessorTest {
 
     @Test
     public void testProcess() {
-
         PowerMockito.when(taskExecutionContextCacheManager.getByTaskInstanceId(1)).thenReturn(taskExecutionContext);
         taskKillProcessor.process(channel, command);
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index e176462..0c337e0 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -165,7 +166,7 @@ public class TaskExecuteThreadTest {
 
         @Override
         public AbstractParameters getParameters() {
-            return null;
+            return new SqlParameters();
         }
 
         @Override
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
deleted file mode 100644
index 348775c..0000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutorTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.task;
-
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class AbstractCommandExecutorTest {
-
-    private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutorTest.class);
-
-    private ShellCommandExecutor shellCommandExecutor;
-
-    @Before
-    public void before() throws Exception {
-        System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
-        shellCommandExecutor = new ShellCommandExecutor(null);
-    }
-
-    @Test
-    public void testSetTaskResultString() {
-        shellCommandExecutor.setTaskResultString("shellReturn");
-    }
-
-    @Test
-    public void testGetTaskResultString() {
-        logger.info(shellCommandExecutor.getTaskResultString());
-    }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
index 892299c..574f0e7 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/ShellTaskReturnTest.java
@@ -110,17 +110,6 @@ public class ShellTaskReturnTest {
         } catch (Exception e) {
             e.printStackTrace();
         }
-        shellTask.setResult("shell return string");
-        logger.info("shell return string:{}", shellTask.getResultString());
     }
 
-    @Test
-    public void testSetTaskResultString() {
-        shellCommandExecutor.setTaskResultString("shellReturn");
-    }
-
-    @Test
-    public void testGetTaskResultString() {
-        logger.info(shellCommandExecutor.getTaskResultString());
-    }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
index 46d2713..cb8a189 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -164,11 +164,6 @@ public class TaskManagerTest {
         definedParams.put("time_gb", "2020-12-16 00:00:00");
         taskExecutionContext.setDefinedParams(definedParams);
         ShellTask shellTask = (ShellTask) TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
-        shellTask.setResultString("shell return");
-        String shellReturn = shellTask.getResultString();
-        shellTask.init();
-        shellTask.setResult(shellReturn);
-        Assert.assertSame(shellReturn, "shell return");
     }
 
     @Test
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java
new file mode 100644
index 0000000..f384f83
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskParamsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.dolphinscheduler.common.enums.DataType;
+import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
+import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * shell task return test.
+ */
+@RunWith(PowerMockRunner.class)
+public class TaskParamsTest {
+    private static final Logger logger = LoggerFactory.getLogger(TaskParamsTest.class);
+
+    @Test
+    public void testDealOutParam() {
+        List<Property> properties = new ArrayList<>();
+        Property property = new Property();
+        property.setProp("test1");
+        property.setDirect(Direct.OUT);
+        property.setType(DataType.VARCHAR);
+        property.setValue("test1");
+        properties.add(property);
+
+        ShellParameters shellParameters = new ShellParameters();
+        String resultShell = "key1=value1$VarPoolkey2=value2";
+        shellParameters.varPool = new ArrayList<>();
+        shellParameters.setLocalParams(properties);
+        shellParameters.dealOutParam(resultShell);
+        assertNotNull(shellParameters.getVarPool().get(0));
+
+        String sqlResult = "[{\"id\":6,\"test1\":\"6\"},{\"id\":70002,\"test1\":\"+1\"}]";
+        SqlParameters sqlParameters = new SqlParameters();
+        String sqlResult1 = "[{\"id\":6,\"test1\":\"6\"}]";
+        sqlParameters.setLocalParams(properties);
+        sqlParameters.varPool = new ArrayList<>();
+        sqlParameters.dealOutParam(sqlResult1);
+        assertNotNull(sqlParameters.getVarPool().get(0));
+
+        property.setType(DataType.LIST);
+        properties.clear();
+        properties.add(property);
+        sqlParameters.setLocalParams(properties);
+        sqlParameters.dealOutParam(sqlResult);
+        assertNotNull(sqlParameters.getVarPool().get(0));
+    }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
index bd02f61..c992a0a 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
@@ -62,7 +62,6 @@ public class ShellTaskTest {
         System.setProperty("log4j2.disable.jmx", Boolean.TRUE.toString());
         shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
         PowerMockito.whenNew(ShellCommandExecutor.class).withAnyArguments().thenReturn(shellCommandExecutor);
-        shellCommandExecutor.setTaskResultString("shellReturn");
         taskExecutionContext = new TaskExecutionContext();
         taskExecutionContext.setTaskInstanceId(1);
         taskExecutionContext.setTaskName("kris test");
@@ -85,6 +84,7 @@ public class ShellTaskTest {
         taskExecutionContext.setTenantCode("roo");
         taskExecutionContext.setScheduleTime(new Date());
         taskExecutionContext.setQueue("default");
+        taskExecutionContext.setVarPool("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
         taskExecutionContext.setTaskParams(
             "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss +3]\\necho \\\" ?? ${time1} \\\"\\necho \\\" ????? ${time2}\\\"\\n\",\"localParams\":"
                 +
@@ -105,6 +105,7 @@ public class ShellTaskTest {
     public void testComplementData() throws Exception {
         shellTask = new ShellTask(taskExecutionContext, logger);
         shellTask.init();
+        shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
         shellCommandExecutor.isSuccessOfYarnState(new ArrayList<>());
         shellCommandExecutor.isSuccessOfYarnState(null);
         PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
@@ -116,16 +117,9 @@ public class ShellTaskTest {
         taskExecutionContext.setCmdTypeIfComplement(0);
         shellTask = new ShellTask(taskExecutionContext, logger);
         shellTask.init();
+        shellTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
         PowerMockito.when(shellCommandExecutor.run(anyString())).thenReturn(commandExecuteResult);
         shellTask.handle();
     }
 
-    @Test
-    public void testSetResult() {
-        shellTask = new ShellTask(taskExecutionContext, logger);
-        shellTask.init();
-        String r = "return";
-        shellTask.setResult(r);
-    }
-
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
index 0eb0f98..6336774 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
@@ -97,6 +97,7 @@ public class SqlTaskTest {
         PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
         PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
         PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
+        PowerMockito.when(taskExecutionContext.getVarPool()).thenReturn("[{\"direct\":\"IN\",\"prop\":\"test\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
 
         SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
         sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
@@ -106,6 +107,7 @@ public class SqlTaskTest {
         PowerMockito.when(SpringApplicationContext.getBean(Mockito.any())).thenReturn(new AlertDao());
         alertClientService = PowerMockito.mock(AlertClientService.class);
         sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
+        sqlTask.getParameters().setVarPool(taskExecutionContext.getVarPool());
         sqlTask.init();
     }
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 93298f8..f7b5de3 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -133,7 +133,6 @@ import org.springframework.transaction.annotation.Transactional;
 
 import com.cronutils.model.Cron;
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -1584,71 +1583,51 @@ public class ProcessService {
                                 int processId,
                                 String appIds,
                                 int taskInstId,
-                                String varPool,
-                                String result) {
+                                String varPool) {
         taskInstance.setPid(processId);
         taskInstance.setAppLink(appIds);
         taskInstance.setState(state);
         taskInstance.setEndTime(endTime);
         taskInstance.setVarPool(varPool);
-        changeOutParam(result, taskInstance);
+        changeOutParam(taskInstance);
         saveTaskInstance(taskInstance);
     }
 
-    public void changeOutParam(String result, TaskInstance taskInstance) {
-        if (StringUtils.isEmpty(result)) {
+    /**
+     * for show in page of taskInstance
+     * @param taskInstance
+     */
+    public void changeOutParam(TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(taskInstance.getVarPool())) {
             return;
         }
-        List<Map<String, String>> workerResultParam = getListMapByString(result);
-        if (CollectionUtils.isEmpty(workerResultParam)) {
+        List<Property> properties = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
+        if (CollectionUtils.isEmpty(properties)) {
             return;
         }
         //if the result more than one line,just get the first .
-        Map<String, String> row = workerResultParam.get(0);
-        if (row == null || row.size() == 0) {
-            return;
-        }
         Map<String, Object> taskParams = JSONUtils.toMap(taskInstance.getTaskParams(), String.class, Object.class);
         Object localParams = taskParams.get(LOCAL_PARAMS);
         if (localParams == null) {
             return;
         }
-        ProcessInstance processInstance = this.processInstanceMapper.queryDetailById(taskInstance.getProcessInstanceId());
-        List<Property> params4Property = JSONUtils.toList(processInstance.getGlobalParams(), Property.class);
-        Map<String, Property> allParamMap = params4Property.stream().collect(Collectors.toMap(Property::getProp, Property -> Property));
-
         List<Property> allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class);
+        Map<String, String> outProperty = new HashMap<>();
+        for (Property info : properties) {
+            if (info.getDirect() == Direct.OUT) {
+                outProperty.put(info.getProp(), info.getValue());
+            }
+        }
         for (Property info : allParam) {
             if (info.getDirect() == Direct.OUT) {
                 String paramName = info.getProp();
-                Property property = allParamMap.get(paramName);
-                if (property == null) {
-                    continue;
-                }
-                String value = String.valueOf(row.get(paramName));
-                if (StringUtils.isNotEmpty(value)) {
-                    property.setValue(value);
-                    info.setValue(value);
-                }
+                info.setValue(outProperty.get(paramName));
             }
         }
         taskParams.put(LOCAL_PARAMS, allParam);
         taskInstance.setTaskParams(JSONUtils.toJsonString(taskParams));
-        String params4ProcessString = JSONUtils.toJsonString(params4Property);
-        int updateCount = this.processInstanceMapper.updateGlobalParamsById(params4ProcessString, processInstance.getId());
-        logger.info("updateCount:{}, params4Process:{}, processInstanceId:{}", updateCount, params4ProcessString, processInstance.getId());
     }
 
-    public List<Map<String, String>> getListMapByString(String json) {
-        List<Map<String, String>> allParams = new ArrayList<>();
-        ArrayNode paramsByJson = JSONUtils.parseArray(json);
-        Iterator<JsonNode> listIterator = paramsByJson.iterator();
-        while (listIterator.hasNext()) {
-            Map<String, String> param = JSONUtils.toMap(listIterator.next().toString(), String.class, String.class);
-            allParams.add(param);
-        }
-        return allParams;
-    }
 
     /**
      * convert integer list to string list
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 4f4144e..643dc09 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -509,4 +509,19 @@ public class ProcessServiceTest {
         Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
     }
 
+    @Test
+    public void testChangeOutParam() {
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setProcessInstanceId(62);
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setId(62);
+        taskInstance.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"\"}]");
+        taskInstance.setTaskParams("{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select id from tb_test limit 1\","
+                + "\"udfs\":\"\",\"sqlType\":\"0\",\"sendEmail\":false,\"displayRows\":10,\"title\":\"\","
+                + "\"groupId\":null,\"localParams\":[{\"prop\":\"test1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"12\"}],"
+                + "\"connParams\":\"\",\"preStatements\":[],\"postStatements\":[],\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+                + "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":\"{}\"}");
+        processService.changeOutParam(taskInstance);
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 3bfe355..482488f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1009,8 +1009,8 @@
                         <include>**/server/worker/task/processdure/ProcedureTaskTest.java</include>
                         <include>**/server/worker/task/shell/ShellTaskTest.java</include>
                         <include>**/server/worker/task/TaskManagerTest.java</include>
-                        <include>**/server/worker/task/AbstractCommandExecutorTest.java</include>
                         <include>**/server/worker/task/PythonCommandExecutorTest.java</include>
+                        <include>**/server/worker/task/TaskParamsTest.java</include>
                         <include>**/server/worker/task/ShellTaskReturnTest.java</include>
                         <include>**/server/worker/task/sql/SqlTaskTest.java</include>
                         <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>