You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/12/18 10:17:18 UTC

[dolphinscheduler] branch dev updated: [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks (#13194)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 66e20271ad [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks (#13194)
66e20271ad is described below

commit 66e20271ad04c571dbd05c114ccbc050ee128dfb
Author: JieguangZhou <ji...@163.com>
AuthorDate: Sun Dec 18 18:17:09 2022 +0800

    [Feature][Master] Add task caching mechanism to improve the running speed of repetitive tasks (#13194)
    
    * Supports task instance cache operation
    
    * add task plugin cache
    
    * use SHA-256 to generate key
    
    * Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
    
    Co-authored-by: Jay Chung <zh...@gmail.com>
    
    * Update dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
    
    Co-authored-by: Jay Chung <zh...@gmail.com>
    
    * Optimizing database Scripts
    
    * Optimize clear cache operation
    
    Co-authored-by: Jay Chung <zh...@gmail.com>
---
 docs/docs/en/faq.md                                |  15 ++
 docs/docs/en/guide/task/appendix.md                |   1 +
 docs/docs/zh/faq.md                                |  15 ++
 docs/docs/zh/guide/task/appendix.md                |   1 +
 .../api/controller/TaskInstanceController.java     |  25 +++
 .../TaskInstanceRemoveCacheResponse.java           |  45 ++----
 .../apache/dolphinscheduler/api/enums/Status.java  |   3 +
 .../api/service/TaskInstanceService.java           |  10 ++
 .../api/service/impl/TaskInstanceServiceImpl.java  |  36 +++++
 .../api/service/TaskInstanceServiceTest.java       |  31 ++++
 .../apache/dolphinscheduler/common/enums/Flag.java |   1 +
 .../common/enums/TaskEventType.java                |   3 +-
 .../dao/entity/TaskDefinition.java                 |   6 +
 .../dao/entity/TaskDefinitionLog.java              |   1 +
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  13 ++
 .../dao/mapper/TaskInstanceMapper.java             |   4 +
 .../dao/repository/TaskInstanceDao.java            |  14 ++
 .../dao/repository/impl/TaskInstanceDaoImpl.java   |  20 +++
 .../dolphinscheduler/dao/utils/TaskCacheUtils.java | 161 +++++++++++++++++++
 .../dao/mapper/TaskDefinitionLogMapper.xml         |   6 +-
 .../dao/mapper/TaskDefinitionMapper.xml            |   8 +-
 .../dao/mapper/TaskInstanceMapper.xml              |  16 +-
 .../src/main/resources/sql/dolphinscheduler_h2.sql |   4 +
 .../main/resources/sql/dolphinscheduler_mysql.sql  |   7 +-
 .../resources/sql/dolphinscheduler_postgresql.sql  |   5 +
 .../3.2.0_schema/mysql/dolphinscheduler_ddl.sql    |  98 ++++++++++++
 .../postgresql/dolphinscheduler_ddl.sql            |  14 ++
 .../dao/utils/TaskCacheUtilsTest.java              | 172 +++++++++++++++++++++
 .../master/consumer/TaskPriorityQueueConsumer.java |  49 ++++++
 .../server/master/event/TaskCacheEventHandler.java | 109 +++++++++++++
 .../server/master/processor/queue/TaskEvent.java   |  11 ++
 .../master/runner/WorkflowExecuteRunnable.java     |  21 +++
 .../master/event/TaskCacheEventHandlerTest.java    | 112 ++++++++++++++
 .../dolphinscheduler/service/model/TaskNode.java   |  10 ++
 .../service/process/ProcessServiceImpl.java        |   1 +
 .../service/process/ProcessServiceTest.java        |   2 +
 dolphinscheduler-ui/src/locales/en_US/project.ts   |   4 +-
 dolphinscheduler-ui/src/locales/zh_CN/project.ts   |   4 +-
 .../src/service/modules/task-instances/index.ts    |   7 +
 .../projects/task/components/node/fields/index.ts  |   1 +
 .../task/components/node/fields/use-cache.ts       |  17 +-
 .../task/components/node/fields/use-run-flag.ts    |   3 +-
 .../projects/task/components/node/format-data.ts   |   2 +
 .../task/components/node/tasks/use-chunjun.ts      |   1 +
 .../task/components/node/tasks/use-data-quality.ts |   1 +
 .../task/components/node/tasks/use-datasync.ts     |   3 +-
 .../task/components/node/tasks/use-datax.ts        |   1 +
 .../task/components/node/tasks/use-dinky.ts        |   1 +
 .../projects/task/components/node/tasks/use-dms.ts |   1 +
 .../projects/task/components/node/tasks/use-dvc.ts |   1 +
 .../projects/task/components/node/tasks/use-emr.ts |   1 +
 .../task/components/node/tasks/use-flink-stream.ts |   1 +
 .../task/components/node/tasks/use-flink.ts        |   1 +
 .../task/components/node/tasks/use-hive-cli.ts     |   1 +
 .../task/components/node/tasks/use-http.ts         |   1 +
 .../task/components/node/tasks/use-java.ts         |   1 +
 .../task/components/node/tasks/use-jupyter.ts      |   1 +
 .../projects/task/components/node/tasks/use-k8s.ts |   1 +
 .../task/components/node/tasks/use-kubeflow.ts     |   1 +
 .../task/components/node/tasks/use-linkis.ts       |   1 +
 .../task/components/node/tasks/use-mlflow.ts       |   1 +
 .../projects/task/components/node/tasks/use-mr.ts  |   1 +
 .../task/components/node/tasks/use-openmldb.ts     |   1 +
 .../task/components/node/tasks/use-pigeon.ts       |   1 +
 .../task/components/node/tasks/use-procedure.ts    |   1 +
 .../task/components/node/tasks/use-python.ts       |   1 +
 .../task/components/node/tasks/use-pytorch.ts      |   1 +
 .../task/components/node/tasks/use-sagemaker.ts    |   1 +
 .../task/components/node/tasks/use-sea-tunnel.ts   |   1 +
 .../task/components/node/tasks/use-shell.ts        |   1 +
 .../task/components/node/tasks/use-spark.ts        |   1 +
 .../projects/task/components/node/tasks/use-sql.ts |   1 +
 .../task/components/node/tasks/use-sqoop.ts        |   1 +
 .../task/components/node/tasks/use-zeppelin.ts     |   1 +
 .../views/projects/task/components/node/types.ts   |   4 +-
 .../workflow/components/dag/dag-context-menu.tsx   |  31 +++-
 .../projects/workflow/components/dag/index.tsx     |   8 +
 77 files changed, 1105 insertions(+), 59 deletions(-)

diff --git a/docs/docs/en/faq.md b/docs/docs/en/faq.md
index 93d0710156..042b3d7fa2 100644
--- a/docs/docs/en/faq.md
+++ b/docs/docs/en/faq.md
@@ -752,4 +752,19 @@ start API server. If you want disabled when Python gateway service you could cha
 
 ---
 
+## Q:How to determine whether a task has been cached when the cache is executed, that is, how to determine whether a task can use the running result of another task?
+
+A: For the task identified as `Cache Execution`, when the task starts, a cache key will be generated, and the key is composed of the following fields and hashed:
+
+- task definition: the id of the task definition corresponding to the task instance
+- task version: the version of the task definition corresponding to the task instance
+- task input parameters: including the parameters passed in by the upstream node and the global parameter, the parameters referenced by the parameter list of the task definition and the parameters used by the task definition using `${}`
+- environment configuration: the actual configuration content of the environment configuration under the environment name, that is, the actual configuration content in the `security` - `environment management`
+
+If the task with cache identification runs, it will find whether there is data with the same cache key in the database,
+- If there is, copy the task instance and update the corresponding data
+- If not, the task runs as usual, and the task instance data is stored in the cache when the task is completed
+
+If you do not need to cache, you can right-click the node to run `Clear cache` in the workflow instance to clear the cache, which will clear the cache data of the current input parameters under this version.
+
 We will collect more FAQ later
diff --git a/docs/docs/en/guide/task/appendix.md b/docs/docs/en/guide/task/appendix.md
index 57cb5dbc31..4569216617 100644
--- a/docs/docs/en/guide/task/appendix.md
+++ b/docs/docs/en/guide/task/appendix.md
@@ -8,6 +8,7 @@ DolphinScheduler task plugins share some common default parameters. Each type of
 |--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | Node Name                | The name of the task. Node names within the same workflow must be unique.                                                                                                                                                                                                                      |
 | Run Flag                 | Indicating whether to schedule the task. If you do not need to execute the task, you can turn on the `Prohibition execution` switch.                                                                                                                                                           |
+| Cache Execution          | Indicating whether this node needs to be cached. If it is cached, the same identifier (same task version, same task definition, same parameter input) task is cached. When the task has been cached, it will not be executed again, and the result will be reused directly.                    |
 | Description              | Describing the function of this node.                                                                                                                                                                                                                                                          |
 | Task Priority            | When the number of the worker threads is insufficient, the worker executes task according to the priority. When two tasks have the same priority, the worker will execute them in `first come first served` fashion.                                                                           |
 | Worker Group             | Machines which execute the tasks. If you choose `default`, scheduler will send the task to a random worker.                                                                                                                                                                                    |
diff --git a/docs/docs/zh/faq.md b/docs/docs/zh/faq.md
index 9461f275cf..294762877d 100644
--- a/docs/docs/zh/faq.md
+++ b/docs/docs/zh/faq.md
@@ -720,4 +720,19 @@ A:在 3.0.0-alpha 版本之后,Python gateway server 集成到 api server 
 
 ---
 
+## Q: 缓存执行时怎么判断任务已经存在缓存过的任务,即如何判断一个任务可以使用另外一个任务的运行结果?
+
+A: 对于标识为`缓存执行`的任务, 当任务启动时会生成一个缓存key, 该key由以下字段组合哈希得到:
+
+- 任务定义:任务实例对应的任务定义的id
+- 任务的版本:任务实例对应的任务定义的版本
+- 任务输入的参数:包括上游节点和全局参数传入的参数中,被任务定义的参数列表所引用和任务定义中使用`${}`引用的参数
+- 环境配置: 环境名称下具体的环境配置内容,具体为安全中心环境管理中的实际配置内容
+
+当缓存标识的任务运行时,会查找数据库中是否用相同缓存key的数据,
+- 若有则复制该任务实例并进行相应数据的更新
+- 若无,则任务照常运行,并在任务完成时将任务实例的数据存入缓存
+
+若不需要缓存时,可以在工作流实例中右键运行清除缓存,则会清除该版本下当前输入的参数的缓存数据。
+
 我们会持续收集更多的 FAQ。
diff --git a/docs/docs/zh/guide/task/appendix.md b/docs/docs/zh/guide/task/appendix.md
index 3966e79ea7..6ba27ad38b 100644
--- a/docs/docs/zh/guide/task/appendix.md
+++ b/docs/docs/zh/guide/task/appendix.md
@@ -8,6 +8,7 @@
 |----------|--------------------------------------------------------------------------------------------------------------------------------------|
 | 任务名称     | 任务的名称,同一个工作流定义中的节点名称不能重复。                                                                                                            |
 | 运行标志     | 标识这个节点是否需要调度执行,如果不需要执行,可以打开禁止执行开关。                                                                                                   |
+| 缓存执行     | 标识这个节点是否需要进行缓存,如果缓存,则对于相同标识(相同任务版本,相同任务定义,相同参数传入)的任务进行缓存,运行时若已经存在缓存过的任务时,不在重复执行,直接复用结果。                                              |
 | 描述       | 当前节点的功能描述。                                                                                                                           |
 | 任务优先级    | worker线程数不足时,根据优先级从高到低依次执行任务,优先级一样时根据先到先得原则执行。                                                                                       |
 | Worker分组 | 设置分组后,任务会被分配给worker组的机器机执行。若选择Default,则会随机选择一个worker执行。                                                                              |
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
index b9fabb0f32..af23a4ce0b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
@@ -19,10 +19,12 @@ package org.apache.dolphinscheduler.api.controller;
 
 import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;
+import static org.apache.dolphinscheduler.api.enums.Status.REMOVE_TASK_INSTANCE_CACHE_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.TASK_SAVEPOINT_ERROR;
 import static org.apache.dolphinscheduler.api.enums.Status.TASK_STOP_ERROR;
 
 import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
+import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
 import org.apache.dolphinscheduler.api.exceptions.ApiException;
 import org.apache.dolphinscheduler.api.service.TaskInstanceService;
 import org.apache.dolphinscheduler.api.utils.Result;
@@ -34,6 +36,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -188,4 +191,26 @@ public class TaskInstanceController extends BaseController {
                                    @PathVariable(value = "id") Integer id) {
         return taskInstanceService.stopTask(loginUser, projectCode, id);
     }
+
+    /**
+     * remove task instance cache
+     *
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param id task instance id
+     * @return the result code and msg
+     */
+    @Operation(summary = "remove-task-instance-cache", description = "REMOVE_TASK_INSTANCE_CACHE")
+    @Parameters({
+            @Parameter(name = "id", description = "TASK_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "12"))
+    })
+    @DeleteMapping(value = "/{id}/remove-cache")
+    @ResponseStatus(HttpStatus.OK)
+    @ApiException(REMOVE_TASK_INSTANCE_CACHE_ERROR)
+    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+    public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
+                                                                   @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+                                                                   @PathVariable(value = "id") Integer id) {
+        return taskInstanceService.removeTaskInstanceCache(loginUser, projectCode, id);
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java
similarity index 57%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
copy to dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java
index 80c9cae999..34a8218304 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/taskInstance/TaskInstanceRemoveCacheResponse.java
@@ -15,41 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+package org.apache.dolphinscheduler.api.dto.taskInstance;
 
-import com.baomidou.mybatisplus.annotation.EnumValue;
+import org.apache.dolphinscheduler.api.utils.Result;
+
+import lombok.Data;
 
 /**
- * have_script
- * have_file
- * can_retry
- * have_arr_variables
- * have_map_variables
- * have_alert
+ * task instance success response
  */
-public enum Flag {
-
-    /**
-     * 0 no
-     * 1 yes
-     */
-    NO(0, "no"),
-    YES(1, "yes");
-
-    Flag(int code, String descp) {
-        this.code = code;
-        this.descp = descp;
-    }
+@Data
+public class TaskInstanceRemoveCacheResponse extends Result {
 
-    @EnumValue
-    private final int code;
-    private final String descp;
+    private String cacheKey;
 
-    public int getCode() {
-        return code;
+    public TaskInstanceRemoveCacheResponse(Result result) {
+        super();
+        this.setCode(result.getCode());
+        this.setMsg(result.getMsg());
     }
 
-    public String getDescp() {
-        return descp;
+    public TaskInstanceRemoveCacheResponse(Result result, String cacheKey) {
+        super();
+        this.setCode(result.getCode());
+        this.setMsg(result.getMsg());
+        this.cacheKey = cacheKey;
     }
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index 0f1cb95b47..cb1941b2a0 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -278,6 +278,7 @@ public enum Status {
     UDF_RESOURCE_IS_BOUND(20013, "udf resource file is bound by UDF functions:{0}", "udf函数绑定了资源文件[{0}]"),
     RESOURCE_IS_USED(20014, "resource file is used by process definition", "资源文件被上线的流程定义使用了"),
     PARENT_RESOURCE_NOT_EXIST(20015, "parent resource not exist", "父资源文件不存在"),
+
     RESOURCE_NOT_EXIST_OR_NO_PERMISSION(20016,
             "resource not exist or no permission,please view the task node and remove error resource",
             "请检查任务节点并移除无权限或者已删除的资源"),
@@ -285,6 +286,8 @@ public enum Status {
             "资源文件已授权其他用户[{0}],后缀不允许修改"),
     RESOURCE_HAS_FOLDER(20018, "There are files or folders in the current directory:{0}", "当前目录下有文件或文件夹[{0}]"),
 
+    REMOVE_TASK_INSTANCE_CACHE_ERROR(20019, "remove task instance cache error", "删除任务实例缓存错误"),
+
     USER_NO_OPERATION_PERM(30001, "user has no operation privilege", "当前用户没有操作权限"),
     USER_NO_OPERATION_PROJECT_PERM(30002, "user {0} is not has project {1} permission", "当前用户[{0}]没有[{1}]项目的操作权限"),
     USER_NO_WRITE_PROJECT_PERM(30003, "user [{0}] does not have write permission for project [{1}]",
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
index 9fa259eb2b..ed23d3695e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -100,4 +101,13 @@ public interface TaskInstanceService {
      * @return the result code and msg
      */
     TaskInstance queryTaskInstanceById(User loginUser, long projectCode, Long taskInstanceId);
+
+    /**
+     * remove task instance cache
+     * @param loginUser
+     * @param projectCode
+     * @param taskInstanceId
+     * @return
+     */
+    TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode, Integer taskInstanceId);
 }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index a212cb8299..1d4f9b64de 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -18,8 +18,10 @@
 package org.apache.dolphinscheduler.api.service.impl;
 
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.FORCED_SUCCESS;
+import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
 import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_INSTANCE;
 
+import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
 import org.apache.dolphinscheduler.api.service.ProjectService;
@@ -38,6 +40,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
 import org.apache.dolphinscheduler.remote.command.TaskSavePointRequestCommand;
@@ -45,6 +49,9 @@ import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -81,6 +88,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
     @Autowired
     TaskInstanceMapper taskInstanceMapper;
 
+    @Autowired
+    TaskInstanceDao taskInstanceDao;
+
     @Autowired
     ProcessInstanceService processInstanceService;
 
@@ -319,4 +329,30 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
         }
         return taskInstance;
     }
+
+    @Override
+    public TaskInstanceRemoveCacheResponse removeTaskInstanceCache(User loginUser, long projectCode,
+                                                                   Integer taskInstanceId) {
+        Result result = new Result();
+
+        Project project = projectMapper.queryByCode(projectCode);
+        projectService.checkProjectAndAuthThrowException(loginUser, project, INSTANCE_UPDATE);
+
+        TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
+        if (taskInstance == null) {
+            logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode,
+                    taskInstanceId);
+            putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
+            return new TaskInstanceRemoveCacheResponse(result);
+        }
+        String tagCacheKey = taskInstance.getCacheKey();
+        Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(tagCacheKey);
+        String cacheKey = taskIdAndCacheKey.getRight();
+        if (StringUtils.isNotEmpty(cacheKey)) {
+            taskInstanceDao.clearCacheByCacheKey(cacheKey);
+        }
+        putMsg(result, Status.SUCCESS);
+        return new TaskInstanceRemoveCacheResponse(result, cacheKey);
+    }
+
 }
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index 0482ab7ef4..89de9b6643 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.api.ApiApplicationServer;
+import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse;
 import org.apache.dolphinscheduler.api.enums.Status;
 import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
 import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
@@ -40,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
@@ -93,6 +95,9 @@ public class TaskInstanceServiceTest {
     @Mock
     TaskDefinitionMapper taskDefinitionMapper;
 
+    @Mock
+    TaskInstanceDao taskInstanceDao;
+
     @Test
     public void queryTaskListPaging() {
         long projectCode = 1L;
@@ -341,4 +346,30 @@ public class TaskInstanceServiceTest {
         Assertions.assertEquals(Status.SUCCESS.getCode(), successRes.getCode().intValue());
 
     }
+
+    @Test
+    public void testRemoveTaskInstanceCache() {
+        User user = getAdminUser();
+        long projectCode = 1L;
+        Project project = getProject(projectCode);
+        int taskId = 1;
+        TaskInstance task = getTaskInstance();
+        String cacheKey = "950311f3597f9198976cd3fd69e208e5b9ba6750";
+        task.setCacheKey(cacheKey);
+
+        when(projectMapper.queryByCode(projectCode)).thenReturn(project);
+        when(taskInstanceMapper.selectById(1)).thenReturn(task);
+        when(taskInstanceDao.findTaskInstanceByCacheKey(cacheKey)).thenReturn(task, null);
+        when(taskInstanceDao.updateTaskInstance(task)).thenReturn(true);
+
+        TaskInstanceRemoveCacheResponse response =
+                taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
+        Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode());
+
+        when(taskInstanceMapper.selectById(1)).thenReturn(null);
+        TaskInstanceRemoveCacheResponse responseNotFoundTask =
+                taskInstanceService.removeTaskInstanceCache(user, projectCode, taskId);
+        Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), responseNotFoundTask.getCode());
+
+    }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
index 80c9cae999..a080a6c826 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
@@ -26,6 +26,7 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
  * have_arr_variables
  * have_map_variables
  * have_alert
+ * is_cache
  */
 public enum Flag {
 
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
index 09f85d3f17..f24f168679 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
@@ -22,5 +22,6 @@ public enum TaskEventType {
     DELAY,
     RUNNING,
     RESULT,
-    WORKER_REJECT
+    WORKER_REJECT,
+    CACHE,
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 3da375dd0b..18c3a405a7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -115,6 +115,11 @@ public class TaskDefinition {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private Flag isCache;
+
     /**
      * task priority
      */
@@ -281,6 +286,7 @@ public class TaskDefinition {
                 && Objects.equals(taskType, that.taskType)
                 && Objects.equals(taskParams, that.taskParams)
                 && flag == that.flag
+                && isCache == that.isCache
                 && taskPriority == that.taskPriority
                 && Objects.equals(workerGroup, that.workerGroup)
                 && timeoutFlag == that.timeoutFlag
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
index 3701956612..a42f593a9f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
@@ -68,6 +68,7 @@ public class TaskDefinitionLog extends TaskDefinition {
         this.setFailRetryInterval(taskDefinition.getFailRetryInterval());
         this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
         this.setFlag(taskDefinition.getFlag());
+        this.setIsCache(taskDefinition.getIsCache());
         this.setModifyBy(taskDefinition.getModifyBy());
         this.setCpuQuota(taskDefinition.getCpuQuota());
         this.setMemoryMax(taskDefinition.getMemoryMax());
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 7f9c0b7e5f..fbef6cc9fe 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -40,6 +40,7 @@ import java.util.Map;
 
 import lombok.Data;
 
+import com.baomidou.mybatisplus.annotation.FieldStrategy;
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -187,6 +188,17 @@ public class TaskInstance implements Serializable {
      */
     private Flag flag;
 
+    /**
+     * task is cache: yes/no
+     */
+    private Flag isCache;
+
+    /**
+     * cache_key
+     */
+    @TableField(updateStrategy = FieldStrategy.IGNORED)
+    private String cacheKey;
+
     /**
      * dependency
      */
@@ -409,4 +421,5 @@ public class TaskInstance implements Serializable {
         // task retry does not over time, return false
         return getRetryInterval() * SEC_2_MINUTES_TIME_UNIT < failedTimeInterval;
     }
+
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index faa481e294..14f354f11e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -57,6 +57,10 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
     TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int processInstanceId,
                                           @Param("taskCode") Long taskCode);
 
+    TaskInstance queryByCacheKey(@Param("cacheKey") String cacheKey);
+
+    Boolean clearCacheByCacheKey(@Param("cacheKey") String cacheKey);
+
     List<TaskInstance> queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds") List<Integer> processInstanceIds,
                                                              @Param("taskCodes") List<Long> taskCodes);
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index 11b53bbeeb..8537e86ba1 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -88,6 +88,20 @@ public interface TaskInstanceDao {
      */
     TaskInstance findTaskInstanceById(Integer taskId);
 
+    /**
+     * find task instance by cache_key
+     * @param cacheKey cache key
+     * @return task instance
+     */
+    TaskInstance findTaskInstanceByCacheKey(String cacheKey);
+
+    /**
+     * clear task instance cache by cache_key
+     * @param cacheKey cache key
+     * @return task instance
+     */
+    Boolean clearCacheByCacheKey(String cacheKey);
+
     /**
      * find task instance list by id list
      * @param idList task id list
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index a68a0953fd..7566e17783 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
 
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.ArrayList;
 import java.util.Date;
@@ -164,6 +165,25 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         return taskInstanceMapper.selectById(taskId);
     }
 
+    @Override
+    public TaskInstance findTaskInstanceByCacheKey(String cacheKey) {
+        if (StringUtils.isEmpty(cacheKey)) {
+            return null;
+        }
+        return taskInstanceMapper.queryByCacheKey(cacheKey);
+    }
+
+    @Override
+    public Boolean clearCacheByCacheKey(String cacheKey) {
+        try {
+            taskInstanceMapper.clearCacheByCacheKey(cacheKey);
+            return true;
+        } catch (Exception e) {
+            logger.error("clear cache by cacheKey failed", e);
+            return false;
+        }
+    }
+
     @Override
     public List<TaskInstance> findTaskInstanceByIdList(List<Integer> idList) {
         if (CollectionUtils.isEmpty(idList)) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
new file mode 100644
index 0000000000..207756af77
--- /dev/null
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class TaskCacheUtils {
+
+    private TaskCacheUtils() {
+        throw new IllegalStateException("Utility class");
+    }
+
+    public static final String MERGE_TAG = "-";
+
+    /**
+     * generate cache key for task instance
+     * the follow message will be used to generate cache key
+     * 2. task version
+     * 3. task is cache
+     * 4. input VarPool, from upstream task and workflow global parameters
+     * @param taskInstance task instance
+     * @param taskExecutionContext taskExecutionContext
+     * @return cache key
+     */
+    public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) {
+        List<String> keyElements = new ArrayList<>();
+        keyElements.add(String.valueOf(taskInstance.getTaskCode()));
+        keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
+        keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
+        keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig()));
+        keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+        String data = StringUtils.join(keyElements, "_");
+        return DigestUtils.sha256Hex(data);
+    }
+
+    /**
+     * generate cache key for task instance which is cache execute
+     * this key will record which cache task instance will be copied, and cache key will be used
+     * tagCacheKey = sourceTaskId + "-" + cacheKey
+     * @param sourceTaskId source task id
+     * @param cacheKey cache key
+     * @return tagCacheKey
+     */
+    public static String generateTagCacheKey(Integer sourceTaskId, String cacheKey) {
+        return sourceTaskId + MERGE_TAG + cacheKey;
+    }
+
+    /**
+     * revert cache key tag to source task id and cache key
+     * @param tagCacheKey cache key
+     * @return Pair<Integer, String>, first is source task id, second is cache key
+     */
+    public static Pair<Integer, String> revertCacheKey(String tagCacheKey) {
+        Pair<Integer, String> taskIdAndCacheKey;
+        if (tagCacheKey == null) {
+            taskIdAndCacheKey = Pair.of(-1, "");
+            return taskIdAndCacheKey;
+        }
+        if (tagCacheKey.contains(MERGE_TAG)) {
+            String[] split = tagCacheKey.split(MERGE_TAG);
+            if (split.length == 2) {
+                taskIdAndCacheKey = Pair.of(Integer.parseInt(split[0]), split[1]);
+            } else {
+                taskIdAndCacheKey = Pair.of(-1, "");
+            }
+            return taskIdAndCacheKey;
+        } else {
+            return Pair.of(-1, tagCacheKey);
+        }
+    }
+
+    /**
+     * get hash data of task input var pool
+     * there are two parts of task input var pool: from upstream task and workflow global parameters
+     * @param taskInstance task instance
+     * taskExecutionContext taskExecutionContext
+     */
+    public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) {
+        JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams());
+
+        // The set of input values considered from localParams in the taskParams
+        Set<String> propertyInSet = JSONUtils.toList(taskParams.get("localParams").toString(), Property.class).stream()
+                .filter(property -> property.getDirect().equals(Direct.IN))
+                .map(Property::getProp).collect(Collectors.toSet());
+
+        // The set of input values considered from `${var}` form task definition
+        propertyInSet.addAll(getScriptVarInSet(taskInstance));
+
+        // var pool value from upstream task
+        List<Property> varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class);
+
+        // var pool value from workflow global parameters
+        if (context.getPrepareParamsMap() != null) {
+            Set<String> taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet());
+            List<Property> globalContextVarPool = context.getPrepareParamsMap().entrySet().stream()
+                    .filter(entry -> !taskVarPoolSet.contains(entry.getKey()))
+                    .map(Map.Entry::getValue)
+                    .collect(Collectors.toList());
+            varPool.addAll(globalContextVarPool);
+        }
+
+        // only consider var pool value which is in propertyInSet
+        varPool = varPool.stream()
+                .filter(property -> property.getDirect().equals(Direct.IN))
+                .filter(property -> propertyInSet.contains(property.getProp()))
+                .sorted(Comparator.comparing(Property::getProp))
+                .collect(Collectors.toList());
+        return JSONUtils.toJsonString(varPool);
+    }
+
+    /**
+     * get var in set from task definition
+     * @param taskInstance task instance
+     * @return var in set
+     */
+    public static List<String> getScriptVarInSet(TaskInstance taskInstance) {
+        Pattern pattern = Pattern.compile("\\$\\{(.+?)\\}");
+        Matcher matcher = pattern.matcher(taskInstance.getTaskParams());
+
+        List<String> varInSet = new ArrayList<>();
+        while (matcher.find()) {
+            varInSet.add(matcher.group(1));
+        }
+        return varInSet;
+    }
+
+}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index d914f9db08..756213a336 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -19,7 +19,7 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper">
     <sql id="baseSql">
-        id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
+        id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority,
         worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
         resource_ids, operator, operate_time, create_time, update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type
     </sql>
@@ -50,14 +50,14 @@
     </select>
     <insert id="batchInsert">
         insert into t_ds_task_definition_log (code, name, version, description, project_code, user_id,
-        task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
+        task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
         timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, operator, operate_time, create_time,
         update_time, task_group_id, task_group_priority, cpu_quota, memory_max, task_execute_type)
         values
         <foreach collection="taskDefinitionLogs" item="taskDefinitionLog" separator=",">
             (#{taskDefinitionLog.code},#{taskDefinitionLog.name},#{taskDefinitionLog.version},#{taskDefinitionLog.description},
             #{taskDefinitionLog.projectCode},#{taskDefinitionLog.userId},#{taskDefinitionLog.taskType},#{taskDefinitionLog.taskParams},
-            #{taskDefinitionLog.flag},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
+            #{taskDefinitionLog.flag},#{taskDefinitionLog.isCache},#{taskDefinitionLog.taskPriority},#{taskDefinitionLog.workerGroup},#{taskDefinitionLog.environmentCode},
             #{taskDefinitionLog.failRetryTimes},#{taskDefinitionLog.failRetryInterval},#{taskDefinitionLog.timeoutFlag},#{taskDefinitionLog.timeoutNotifyStrategy},
             #{taskDefinitionLog.timeout},#{taskDefinitionLog.delayTime},#{taskDefinitionLog.resourceIds},#{taskDefinitionLog.operator},#{taskDefinitionLog.operateTime},
             #{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime}, #{taskDefinitionLog.taskGroupId},#{taskDefinitionLog.taskGroupPriority},
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
index a4040bd698..fbc9c793df 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
@@ -19,13 +19,13 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper">
     <sql id="baseSql">
-        id, code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority,
+        id, code, name, version, description, project_code, user_id, task_type, task_params, flag, is_cache, task_priority,
         worker_group, environment_code, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
         resource_ids, create_time, update_time, task_group_id,task_group_priority, cpu_quota, memory_max, task_execute_type
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.code, ${alias}.name, ${alias}.version, ${alias}.description, ${alias}.project_code, ${alias}.user_id,
-        ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
+        ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.is_cache, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
         ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
         ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
         ${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max, ${alias}.task_execute_type
@@ -96,12 +96,12 @@
 
     <insert id="batchInsert">
         insert into t_ds_task_definition (code, name, version, description, project_code, user_id,
-        task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
+        task_type, task_params, flag, is_cache, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,
         timeout_flag, timeout_notify_strategy, timeout, delay_time, resource_ids, create_time, update_time,task_group_id, task_execute_type)
         values
         <foreach collection="taskDefinitions" item="taskDefinition" separator=",">
             (#{taskDefinition.code},#{taskDefinition.name},#{taskDefinition.version},#{taskDefinition.description},
-            #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},
+            #{taskDefinition.projectCode},#{taskDefinition.userId},#{taskDefinition.taskType},#{taskDefinition.taskParams},#{taskDefinition.flag},#{taskDefinition.isCache},
             #{taskDefinition.taskPriority},#{taskDefinition.workerGroup},#{taskDefinition.environmentCode},#{taskDefinition.failRetryTimes},
             #{taskDefinition.failRetryInterval},#{taskDefinition.timeoutFlag},#{taskDefinition.timeoutNotifyStrategy},#{taskDefinition.timeout},
             #{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime},
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index fc454a6841..306bb7f49a 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -21,13 +21,13 @@
     <sql id="baseSql">
         id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
         start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
-        flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
+        flag, is_cache, cache_key, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
         first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type
     </sql>
     <sql id="baseSqlV2">
         ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
         ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
-        ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
+        ${alias}.flag, ${alias}.is_cache, ${alias}.cache_key, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
         ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
     </sql>
     <update id="setFailoverByHostAndStateArray">
@@ -200,6 +200,18 @@
         and flag = 1
         limit 1
     </select>
+    <select id="queryByCacheKey" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_instance
+        where cache_key = #{cacheKey}
+        limit 1
+    </select>
+    <update id="clearCacheByCacheKey">
+        update t_ds_task_instance
+        set cache_key = null
+        where cache_key = #{cacheKey}
+    </update>
     <select id="queryByProcessInstanceIdsAndTaskCodes" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index e4b767d6c5..3768373cf5 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -481,6 +481,7 @@ CREATE TABLE t_ds_task_definition
     task_execute_type       int(11) DEFAULT '0',
     task_params             longtext,
     flag                    tinyint(2) DEFAULT NULL,
+    is_cache                tinyint(2) DEFAULT '0',
     task_priority           tinyint(4) DEFAULT '2',
     worker_group            varchar(200) DEFAULT NULL,
     environment_code        bigint(20) DEFAULT '-1',
@@ -517,6 +518,7 @@ CREATE TABLE t_ds_task_definition_log
     task_execute_type       int(11) DEFAULT '0',
     task_params             text,
     flag                    tinyint(2) DEFAULT NULL,
+    is_cache                tinyint(2) DEFAULT '0',
     task_priority           tinyint(4) DEFAULT '2',
     worker_group            varchar(200) DEFAULT NULL,
     environment_code        bigint(20) DEFAULT '-1',
@@ -883,6 +885,8 @@ CREATE TABLE t_ds_task_instance
     app_link                text,
     task_params             longtext,
     flag                    tinyint(4) DEFAULT '1',
+    is_cache                tinyint(2) DEFAULT '0',
+    cache_key               varchar(200) DEFAULT NULL,
     retry_interval          int(4) DEFAULT NULL,
     max_retry_times         int(2) DEFAULT NULL,
     task_instance_priority  int(11) DEFAULT NULL,
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index eeb15e7dbb..9444f5886d 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -481,6 +481,7 @@ CREATE TABLE `t_ds_task_definition` (
   `task_execute_type` int(11) DEFAULT '0' COMMENT 'task execute type: 0-batch, 1-stream',
   `task_params` longtext COMMENT 'job custom parameters',
   `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
+  `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
   `task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
   `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
   `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
@@ -516,6 +517,7 @@ CREATE TABLE `t_ds_task_definition_log` (
   `task_execute_type` int(11) DEFAULT '0' COMMENT 'task execute type: 0-batch, 1-stream',
   `task_params` longtext COMMENT 'job custom parameters',
   `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available',
+  `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
   `task_priority` tinyint(4) DEFAULT '2' COMMENT 'job priority',
   `worker_group` varchar(200) DEFAULT NULL COMMENT 'worker grouping',
   `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
@@ -877,6 +879,8 @@ CREATE TABLE `t_ds_task_instance` (
   `app_link` text COMMENT 'yarn app id',
   `task_params` longtext COMMENT 'job custom parameters',
   `flag` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
+  `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available',
+  `cache_key` varchar(200) DEFAULT NULL COMMENT 'cache_key',
   `retry_interval` int(4) DEFAULT NULL COMMENT 'retry interval when task failed ',
   `max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',
   `task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
@@ -894,7 +898,8 @@ CREATE TABLE `t_ds_task_instance` (
   `test_flag`  tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
-  KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE
+  KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE,
+  KEY `idx_cache_key` (`cache_key`) USING BTREE
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------
diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 32a57342d4..121729ba87 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -398,6 +398,7 @@ CREATE TABLE t_ds_task_definition (
   task_execute_type int DEFAULT '0',
   task_params text ,
   flag int DEFAULT NULL ,
+  is_cache int DEFAULT '0',
   task_priority int DEFAULT '2' ,
   worker_group varchar(255) DEFAULT NULL ,
   environment_code bigint DEFAULT '-1',
@@ -436,6 +437,7 @@ CREATE TABLE t_ds_task_definition_log (
   task_execute_type int DEFAULT '0',
   task_params text ,
   flag int DEFAULT NULL ,
+  is_cache int DEFAULT '0' ,
   task_priority int DEFAULT '2' ,
   worker_group varchar(255) DEFAULT NULL ,
   environment_code bigint DEFAULT '-1',
@@ -777,6 +779,8 @@ CREATE TABLE t_ds_task_instance (
   app_link text ,
   task_params text ,
   flag int DEFAULT '1' ,
+  is_cache int DEFAULT '0',
+  cache_key varchar(200) DEFAULT NULL,
   retry_interval int DEFAULT NULL ,
   max_retry_times int DEFAULT NULL ,
   task_instance_priority int DEFAULT NULL ,
@@ -796,6 +800,7 @@ CREATE TABLE t_ds_task_instance (
 ) ;
 
 create index idx_task_instance_code_version on t_ds_task_instance (task_code, task_definition_version);
+create index idx_cache_key on t_ds_task_instance (cache_key);
 
 --
 -- Table structure for table t_ds_tenant
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
index d852683640..a12439debc 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
@@ -120,3 +120,101 @@ d//
 delimiter ;
 CALL uc_dolphin_T_t_ds_task_instance_R_test_flag;
 DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag;
+
+-- uc_dolphin_T_t_ds_task_definition_R_is_cache
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_R_is_cache;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_R_is_cache()
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_task_definition'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='is_cache')
+   THEN
+ALTER TABLE t_ds_task_definition ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
+END IF;
+END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_definition_R_is_cache;
+DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_R_is_cache;
+
+
+-- uc_dolphin_T_t_ds_task_definition_log_R_is_cache
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_definition_log_R_is_cache()
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_task_definition_log'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='is_cache')
+   THEN
+ALTER TABLE t_ds_task_definition_log ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
+END IF;
+END;
+
+d//
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
+DROP PROCEDURE uc_dolphin_T_t_ds_task_definition_log_R_is_cache;
+
+
+-- uc_dolphin_T_t_ds_task_instance_R_is_cache
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_is_cache;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_is_cache()
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_task_instance'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='is_cache')
+   THEN
+ALTER TABLE t_ds_task_instance ADD `is_cache` tinyint(2) DEFAULT '0' COMMENT '0 not available, 1 available';
+END IF;
+END;
+
+d//
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_instance_R_is_cache;
+DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_is_cache;
+
+-- uc_dolphin_T_t_ds_task_instance_R_cache_key
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_cache_key;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_cache_key()
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_task_instance'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='cache_key')
+   THEN
+ALTER TABLE t_ds_task_instance ADD `cache_key` varchar(255) DEFAULT null COMMENT 'cache key';
+END IF;
+END;
+
+d//
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_instance_R_cache_key;
+DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_cache_key;
+
+
+-- ALTER TABLE `t_ds_task_instance` ADD KEY `cache_key`( `cache_key`);
+drop PROCEDURE if EXISTS add_t_ds_task_instance_idx_cache_key;
+delimiter d//
+CREATE PROCEDURE add_t_ds_task_instance_idx_cache_key()
+BEGIN
+    IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.STATISTICS
+        WHERE TABLE_NAME='t_ds_task_instance'
+        AND TABLE_SCHEMA=(SELECT DATABASE())
+        AND INDEX_NAME='cache_key')
+    THEN
+ALTER TABLE `t_ds_task_instance` ADD KEY `cache_key`( `cache_key` );
+END IF;
+END;
+d//
+delimiter ;
+CALL add_t_ds_task_instance_idx_cache_key;
+DROP PROCEDURE add_t_ds_task_instance_idx_cache_key;
diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
index 8b6dbba36b..c88f0593c4 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
@@ -119,3 +119,17 @@ d//
 delimiter ;
 select uc_dolphin_T_t_ds_task_instance_R_test_flag();
 DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag();
+
+ALTER TABLE t_ds_task_definition DROP COLUMN IF EXISTS is_cache;
+ALTER TABLE t_ds_task_definition ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
+
+ALTER TABLE t_ds_task_definition_log DROP COLUMN IF EXISTS is_cache;
+ALTER TABLE t_ds_task_definition_log ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
+
+ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS is_cache;
+ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS is_cache int DEFAULT '0';
+
+ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS cache_key varchar(200) DEFAULT NULL;
+ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS cacke_key;
+
+CREATE INDEX IF NOT EXISTS idx_cache_key ON t_ds_task_instance USING Btree("cache_key");
diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
new file mode 100644
index 0000000000..8d3dbe655b
--- /dev/null
+++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtilsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.utils;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TaskCacheUtilsTest {
+
+    private TaskInstance taskInstance;
+
+    private TaskExecutionContext taskExecutionContext;
+
+    @BeforeEach
+    void setUp() {
+        String taskParams = "{\n" +
+                "  \"localParams\": [\n" +
+                "    {\n" +
+                "      \"prop\": \"a\",\n" +
+                "      \"direct\": \"IN\",\n" +
+                "      \"type\": \"VARCHAR\",\n" +
+                "      \"value\": \"\"\n" +
+                "    },\n" +
+                "    {\n" +
+                "      \"prop\": \"b\",\n" +
+                "      \"direct\": \"IN\",\n" +
+                "      \"type\": \"VARCHAR\",\n" +
+                "      \"value\": \"bb\"\n" +
+                "    }\n" +
+                "  ],\n" +
+                "  \"rawScript\": \"echo ${c}\\necho ${d}\",\n" +
+                "  \"resourceList\": []\n" +
+                "}";
+
+        String varPool = "[\n" +
+                "  {\n" +
+                "    \"prop\": \"c\",\n" +
+                "    \"direct\": \"IN\",\n" +
+                "    \"type\": \"VARCHAR\",\n" +
+                "    \"value\": \"cc\"\n" +
+                "  },\n" +
+                "  {\n" +
+                "    \"prop\": \"k\",\n" +
+                "    \"direct\": \"IN\",\n" +
+                "    \"type\": \"VARCHAR\",\n" +
+                "    \"value\": \"kk\"\n" +
+                "  }\n" +
+                "]";
+
+        taskInstance = new TaskInstance();
+        taskInstance.setTaskParams(taskParams);
+        taskInstance.setVarPool(varPool);
+        taskInstance.setTaskCode(123L);
+        taskInstance.setTaskDefinitionVersion(1);
+        taskInstance.setIsCache(Flag.YES);
+
+        taskExecutionContext = new TaskExecutionContext();
+        Property property = new Property();
+        property.setProp("a");
+        property.setDirect(Direct.IN);
+        property.setType(DataType.VARCHAR);
+        property.setValue("aa");
+        Map<String, Property> prepareParamsMap = new HashMap<>();
+        prepareParamsMap.put("a", property);
+        taskExecutionContext.setPrepareParamsMap(prepareParamsMap);
+
+    }
+
+    @Test
+    void testRevertCacheKey() {
+        Pair<Integer, String> taskIdAndCacheKey1 = TaskCacheUtils.revertCacheKey(null);
+        Assertions.assertEquals(Pair.of(-1, ""), taskIdAndCacheKey1);
+
+        Pair<Integer, String> taskIdAndCacheKey2 = TaskCacheUtils.revertCacheKey("123");
+        Assertions.assertEquals(Pair.of(-1, "123"), taskIdAndCacheKey2);
+
+        Pair<Integer, String> taskIdAndCacheKey3 = TaskCacheUtils.revertCacheKey("1-123");
+        Assertions.assertEquals(Pair.of(1, "123"), taskIdAndCacheKey3);
+
+        Pair<Integer, String> taskIdAndCacheKey4 = TaskCacheUtils.revertCacheKey("1-123-4");
+        Assertions.assertEquals(Pair.of(-1, ""), taskIdAndCacheKey4);
+    }
+
+    @Test
+    void testGetScriptVarInSet() {
+        List<String> scriptVarInSet = TaskCacheUtils.getScriptVarInSet(taskInstance);
+        List<String> except = new ArrayList<>(Arrays.asList("c", "d"));
+        Assertions.assertEquals(except, scriptVarInSet);
+    }
+
+    @Test
+    void TestGetTaskInputVarPoolData() {
+        TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext);
+        // only a=aa and c=cc will influence the result,
+        // b=bb is a fixed value, will be considered in task version
+        // k=kk is not in task params, will be ignored
+        String except =
+                "[{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"aa\"},{\"prop\":\"c\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"cc\"}]";
+        Assertions.assertEquals(except, TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext));
+    }
+
+    @Test
+    void TestGenerateCacheKey() {
+        String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        Property propertyI = new Property();
+        propertyI.setProp("i");
+        propertyI.setDirect(Direct.IN);
+        propertyI.setType(DataType.VARCHAR);
+        propertyI.setValue("ii");
+        taskExecutionContext.getPrepareParamsMap().put("i", propertyI);
+        String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        // i will not influence the result, because task instance not use it
+        Assertions.assertEquals(cacheKeyBase, cacheKeyNew);
+
+        Property propertyD = new Property();
+        propertyD.setProp("d");
+        propertyD.setDirect(Direct.IN);
+        propertyD.setType(DataType.VARCHAR);
+        propertyD.setValue("dd");
+        taskExecutionContext.getPrepareParamsMap().put("i", propertyD);
+        String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        // d will influence the result, because task instance use it
+        Assertions.assertNotEquals(cacheKeyBase, cacheKeyD);
+
+        taskInstance.setTaskDefinitionVersion(100);
+        String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        // task definition version is changed, so cache key changed
+        Assertions.assertNotEquals(cacheKeyD, cacheKeyE);
+
+        taskInstance.setEnvironmentConfig("export PYTHON_HOME=/bin/python3");
+        String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
+        // EnvironmentConfig is changed, so cache key changed
+        Assertions.assertNotEquals(cacheKeyE, cacheKeyF);
+    }
+
+    @Test
+    void testGetCacheKey() {
+        String cacheKey = TaskCacheUtils.generateTagCacheKey(1, "123");
+        Assertions.assertEquals("1-123", cacheKey);
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 359aa40cf9..6de30a7e66 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -18,11 +18,13 @@
 package org.apache.dolphinscheduler.server.master.consumer;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
 import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
@@ -217,6 +219,11 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
                 }
             }
 
+            // check task is cache execution, and decide whether to dispatch
+            if (checkIsCacheExecution(taskInstance, context)) {
+                return true;
+            }
+
             result = dispatcher.dispatch(executionContext);
 
             if (result) {
@@ -276,4 +283,46 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
         }
         return false;
     }
+
+    /**
+     * check if task is cache execution
+     * if the task is defined as cache execution, and we find the cache task instance is finished yet, we will not dispatch this task
+     * @param taskInstance taskInstance
+     * @param context context
+     * @return true if we will not dispatch this task, false if we will dispatch this task
+     */
+    private boolean checkIsCacheExecution(TaskInstance taskInstance, TaskExecutionContext context) {
+        try {
+            // check if task is defined as a cache task
+            if (taskInstance.getIsCache().equals(Flag.NO)) {
+                return false;
+            }
+            // check if task is cache execution
+            String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
+            TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
+            // if we can find the cache task instance, we will add cache event, and return true.
+            if (cacheTaskInstance != null) {
+                logger.info("Task {} is cache, no need to dispatch, task instance id: {}",
+                        taskInstance.getName(), taskInstance.getId());
+                addCacheEvent(taskInstance, cacheTaskInstance);
+                taskInstance.setCacheKey(TaskCacheUtils.generateTagCacheKey(cacheTaskInstance.getId(), cacheKey));
+                return true;
+            } else {
+                // if we can not find cache task, update cache key, and return false. the task will be dispatched
+                taskInstance.setCacheKey(TaskCacheUtils.generateTagCacheKey(taskInstance.getId(), cacheKey));
+            }
+        } catch (Exception e) {
+            logger.error("checkIsCacheExecution error", e);
+        }
+        return false;
+    }
+
+    private void addCacheEvent(TaskInstance taskInstance, TaskInstance cacheTaskInstance) {
+        if (cacheTaskInstance == null) {
+            return;
+        }
+        TaskEvent taskEvent = TaskEvent.newCacheEvent(taskInstance.getProcessInstanceId(), taskInstance.getId(),
+                cacheTaskInstance.getId());
+        taskEventService.addEvent(taskEvent);
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java
new file mode 100644
index 0000000000..7f222bd862
--- /dev/null
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.enums.TaskEventType;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+import java.util.Optional;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TaskCacheEventHandler implements TaskEventHandler {
+
+    @Autowired
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Autowired
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Autowired
+    private DataQualityResultOperator dataQualityResultOperator;
+
+    @Autowired
+    private ProcessService processService;
+
+    @Autowired
+    private TaskInstanceDao taskInstanceDao;
+
+    /**
+     * handle CACHE task event
+     * copy a new task instance from the cache task has been successfully run
+     * @param taskEvent task event
+     */
+    @Override
+    public void handleTaskEvent(TaskEvent taskEvent) {
+        int taskInstanceId = taskEvent.getTaskInstanceId();
+        int processInstanceId = taskEvent.getProcessInstanceId();
+
+        WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(
+                processInstanceId);
+        Optional<TaskInstance> taskInstanceOptional = workflowExecuteRunnable.getTaskInstance(taskInstanceId);
+        if (!taskInstanceOptional.isPresent()) {
+            return;
+        }
+        TaskInstance taskInstance = taskInstanceOptional.get();
+        dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
+
+        TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceById(taskEvent.getCacheTaskInstanceId());
+
+        // keep the task instance fields
+        cacheTaskInstance.setId(taskInstance.getId());
+        cacheTaskInstance.setProcessInstanceId(processInstanceId);
+        cacheTaskInstance.setProcessInstanceName(taskInstance.getProcessInstanceName());
+        cacheTaskInstance.setProcessInstance(taskInstance.getProcessInstance());
+        cacheTaskInstance.setProcessDefine(taskInstance.getProcessDefine());
+        cacheTaskInstance.setStartTime(taskInstance.getSubmitTime());
+        cacheTaskInstance.setSubmitTime(taskInstance.getSubmitTime());
+        cacheTaskInstance.setEndTime(new Date());
+        cacheTaskInstance.setFlag(Flag.YES);
+
+        TaskInstanceUtils.copyTaskInstance(cacheTaskInstance, taskInstance);
+
+        processService.changeOutParam(taskInstance);
+
+        taskInstanceDao.updateTaskInstance(taskInstance);
+        TaskStateEvent stateEvent = TaskStateEvent.builder()
+                .processInstanceId(taskEvent.getProcessInstanceId())
+                .taskInstanceId(taskEvent.getTaskInstanceId())
+                .status(taskEvent.getState())
+                .type(StateEventType.TASK_STATE_CHANGE)
+                .build();
+
+        workflowExecuteThreadPool.submitStateEvent(stateEvent);
+
+    }
+
+    @Override
+    public TaskEventType getHandleEventType() {
+        return TaskEventType.CACHE;
+    }
+}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
index 72fc02665e..47248a4045 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java
@@ -90,6 +90,8 @@ public class TaskEvent {
      */
     private String varPool;
 
+    private int cacheTaskInstanceId;
+
     /**
      * channel
      */
@@ -147,4 +149,13 @@ public class TaskEvent {
         event.setEvent(TaskEventType.WORKER_REJECT);
         return event;
     }
+
+    public static TaskEvent newCacheEvent(int processInstanceId, int taskInstanceId, int cacheTaskInstanceId) {
+        TaskEvent event = new TaskEvent();
+        event.setProcessInstanceId(processInstanceId);
+        event.setTaskInstanceId(taskInstanceId);
+        event.setCacheTaskInstanceId(cacheTaskInstanceId);
+        event.setEvent(TaskEventType.CACHE);
+        return event;
+    }
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index ab0af2be5b..cd6967cf10 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
 import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
 import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
 import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
 import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
 import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@@ -95,6 +96,7 @@ import org.apache.dolphinscheduler.service.utils.LoggerUtils;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -411,6 +413,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
                 // todo: merge the last taskInstance
                 processInstance.setVarPool(taskInstance.getVarPool());
                 processInstanceDao.upsertProcessInstance(processInstance);
+                // save the cacheKey only if the task is defined as cache task and the task is success
+                if (taskInstance.getIsCache().equals(Flag.YES)) {
+                    saveCacheTaskInstance(taskInstance);
+                }
                 if (!processInstance.isBlocked()) {
                     submitPostNode(Long.toString(taskInstance.getTaskCode()));
                 }
@@ -1196,6 +1202,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         taskInstance.setCpuQuota(taskNode.getCpuQuota());
         taskInstance.setMemoryMax(taskNode.getMemoryMax());
 
+        taskInstance.setIsCache(taskNode.getIsCache() == Flag.YES.getCode() ? Flag.YES : Flag.NO);
+
         // task instance priority
         if (taskNode.getTaskInstancePriority() == null) {
             taskInstance.setTaskInstancePriority(Priority.MEDIUM);
@@ -2125,6 +2133,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
         errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
     }
 
+    private void saveCacheTaskInstance(TaskInstance taskInstance) {
+        Pair<Integer, String> taskIdAndCacheKey = TaskCacheUtils.revertCacheKey(taskInstance.getCacheKey());
+        Integer taskId = taskIdAndCacheKey.getLeft();
+        if (taskId.equals(taskInstance.getId())) {
+            taskInstance.setCacheKey(taskIdAndCacheKey.getRight());
+            try {
+                taskInstanceDao.updateTaskInstance(taskInstance);
+            } catch (Exception e) {
+                logger.error("update task instance cache key failed", e);
+            }
+        }
+    }
+
     private enum WorkflowRunnableStatus {
         CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED,
         ;
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java
new file mode 100644
index 0000000000..958cd3a6da
--- /dev/null
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/event/TaskCacheEventHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.event;
+
+import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
+import org.apache.dolphinscheduler.server.master.utils.DataQualityResultOperator;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Optional;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class TaskCacheEventHandlerTest {
+
+    @InjectMocks
+    private TaskCacheEventHandler taskCacheEventHandler;
+
+    @Mock
+    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
+
+    @Mock
+    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
+
+    @Mock
+    private DataQualityResultOperator dataQualityResultOperator;
+
+    @Mock
+    private ProcessService processService;
+
+    @Mock
+    private TaskInstanceDao taskInstanceDao;
+
+    @Test
+    void testHandleTaskEvent() {
+        TaskEvent taskEvent = Mockito.mock(TaskEvent.class);
+        int processInstanceId = 1;
+        int taskInstanceId = 2;
+        int cacheTaskInstanceId = 3;
+        int cacheProcessInstanceId = 4;
+
+        Mockito.when(taskEvent.getTaskInstanceId()).thenReturn(taskInstanceId);
+        Mockito.when(taskEvent.getProcessInstanceId()).thenReturn(processInstanceId);
+        Mockito.when(taskEvent.getCacheTaskInstanceId()).thenReturn(cacheTaskInstanceId);
+
+        TaskInstance cacheTaskInstance = new TaskInstance();
+        cacheTaskInstance.setId(cacheTaskInstanceId);
+        cacheTaskInstance.setProcessInstanceId(cacheProcessInstanceId);
+        cacheTaskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
+
+        Mockito.when(taskInstanceDao.findTaskInstanceById(cacheTaskInstanceId)).thenReturn(cacheTaskInstance);
+
+        WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
+        Mockito.when(processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId))
+                .thenReturn(workflowExecuteRunnable);
+        Optional<TaskInstance> taskInstanceOptional = Mockito.mock(Optional.class);
+        Mockito.when(workflowExecuteRunnable.getTaskInstance(taskInstanceId)).thenReturn(taskInstanceOptional);
+        Mockito.when(taskInstanceOptional.isPresent()).thenReturn(true);
+
+        TaskInstance taskInstance = new TaskInstance();
+        taskInstance.setTaskParams(JSONUtils.toJsonString(new HashMap<>()));
+        taskInstance.setId(taskInstanceId);
+        taskInstance.setProcessInstanceId(processInstanceId);
+        taskInstance.setProcessInstanceName("test");
+        ProcessInstance processInstance = new ProcessInstance();
+        taskInstance.setProcessInstance(processInstance);
+        ProcessDefinition processDefinition = new ProcessDefinition();
+        taskInstance.setProcessDefine(processDefinition);
+        taskInstance.setSubmitTime(new Date());
+
+        Mockito.when(taskInstanceOptional.get()).thenReturn(taskInstance);
+
+        taskCacheEventHandler.handleTaskEvent(taskEvent);
+
+        Assertions.assertEquals(Flag.YES, taskInstance.getFlag());
+        Assertions.assertEquals(taskInstanceId, taskInstance.getId());
+        Assertions.assertEquals(processInstanceId, taskInstance.getProcessInstanceId());
+    }
+
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java
index 05f1480c12..1931e9e2e9 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java
@@ -78,6 +78,8 @@ public class TaskNode {
      */
     private String runFlag;
 
+    private int isCache;
+
     /**
      * the front field
      */
@@ -278,6 +280,14 @@ public class TaskNode {
         this.runFlag = runFlag;
     }
 
+    public int getIsCache() {
+        return isCache;
+    }
+
+    public void setIsCache(int isCache) {
+        this.isCache = isCache;
+    }
+
     public boolean isForbidden() {
         // skip stream task when run DAG
         if (taskExecuteType == TaskExecuteType.STREAM) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index a79381e47c..ccf2f88c89 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -2328,6 +2328,7 @@ public class ProcessServiceImpl implements ProcessService {
                 taskNode.setCpuQuota(taskDefinitionLog.getCpuQuota());
                 taskNode.setMemoryMax(taskDefinitionLog.getMemoryMax());
                 taskNode.setTaskExecuteType(taskDefinitionLog.getTaskExecuteType());
+                taskNode.setIsCache(taskDefinitionLog.getIsCache().getCode());
                 taskNodeList.add(taskNode);
             }
         }
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 12fb2c69a8..ac83839d50 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any;
 
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
 import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
 import org.apache.dolphinscheduler.common.enums.UserType;
@@ -653,6 +654,7 @@ public class ProcessServiceTest {
         taskDefinition.setVersion(2);
         taskDefinition.setCreateTime(new Date());
         taskDefinition.setUpdateTime(new Date());
+        taskDefinition.setIsCache(Flag.NO);
 
         TaskDefinitionLog td2 = new TaskDefinitionLog();
         td2.setCode(2L);
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts
index 1f9f49d50c..4beee4b909 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -277,7 +277,8 @@ export default {
     alarm_group: 'Alarm group',
     startup_parameter: 'Startup Parameter',
     whether_dry_run: 'Whether Dry-Run',
-    please_choose: 'Please Choose'
+    please_choose: 'Please Choose',
+    remove_task_cache: 'Clear cache'
   },
   dag: {
     create: 'Create Workflow',
@@ -325,6 +326,7 @@ export default {
     online: 'Online'
   },
   node: {
+    is_cache: "Cache Execution",
     jvm_args: 'Java VM Parameters',
     jvm_args_tips: 'Please enter virtual machine parameters',
     run_type: 'Run Type',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index ce8589630e..dbce97e30f 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -278,7 +278,8 @@ export default {
     alarm_group: '告警组',
     startup_parameter: '启动参数',
     whether_dry_run: '是否空跑',
-    please_choose: '请选择'
+    please_choose: '请选择',
+    remove_task_cache: '清除缓存'
   },
   dag: {
     create: '创建工作流',
@@ -326,6 +327,7 @@ export default {
     online: '已上线'
   },
   node: {
+    is_cache: "缓存执行",
     is_module_path: '使用模块路径',
     run_type: '运行类型',
     jvm_args: '虚拟机参数',
diff --git a/dolphinscheduler-ui/src/service/modules/task-instances/index.ts b/dolphinscheduler-ui/src/service/modules/task-instances/index.ts
index 4de389f131..9100ab8136 100644
--- a/dolphinscheduler-ui/src/service/modules/task-instances/index.ts
+++ b/dolphinscheduler-ui/src/service/modules/task-instances/index.ts
@@ -54,3 +54,10 @@ export function savePoint(projectCode: number, taskId: number): any {
     method: 'post'
   })
 }
+
+export function removeTaskInstanceCache(projectCode: number, taskId: number): any {
+  return axios({
+    url: `projects/${projectCode}/task-instances/${taskId}/remove-cache`,
+    method: 'delete'
+  })
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
index 678f7d2ce3..e7b4ab06f8 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
@@ -17,6 +17,7 @@
 
 export { useName } from './use-name'
 export { useRunFlag } from './use-run-flag'
+export { useCache } from './use-cache'
 export { useDescription } from './use-description'
 export { useTaskPriority } from './use-task-priority'
 export { useWorkerGroup } from './use-worker-group'
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts
similarity index 74%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
copy to dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts
index 09f85d3f17..75f2454a13 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskEventType.java
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-cache.ts
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.common.enums;
+import { useI18n } from 'vue-i18n'
+import type { IJsonItem } from '../types'
 
-public enum TaskEventType {
-    DISPATCH,
-    DELAY,
-    RUNNING,
-    RESULT,
-    WORKER_REJECT
+export function useCache(): IJsonItem {
+    const { t } = useI18n()
+    return {
+        type: 'switch',
+        field: 'isCache',
+        name: t('project.node.is_cache'),
+        span: 12
+    }
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts
index e320400791..ab61b88ce9 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-run-flag.ts
@@ -34,6 +34,7 @@ export function useRunFlag(): IJsonItem {
     type: 'radio',
     field: 'flag',
     name: t('project.node.run_flag'),
-    options: options
+    options: options,
+    span: 12
   }
 }
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
index c8aecc2b64..ea804dc1d7 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
@@ -478,6 +478,7 @@ export function formatParams(data: INodeData): {
         : '0',
       failRetryTimes: data.failRetryTimes ? String(data.failRetryTimes) : '0',
       flag: data.flag,
+      isCache: data.isCache ? "YES" : "NO",
       name: data.name,
       taskGroupId: data.taskGroupId,
       taskGroupPriority: data.taskGroupPriority,
@@ -526,6 +527,7 @@ export function formatModel(data: ITaskData) {
     ...omit(data.taskParams, ['resourceList', 'mainJar', 'localParams']),
     environmentCode: data.environmentCode === -1 ? null : data.environmentCode,
     timeoutFlag: data.timeoutFlag === 'OPEN',
+    isCache: data.isCache === 'YES',
     timeoutNotifyStrategy: data.timeoutNotifyStrategy
       ? [data.timeoutNotifyStrategy]
       : [],
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts
index 65f714d1ed..14c738c86c 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts
@@ -56,6 +56,7 @@ export function useChunjun({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts
index 0110570dd9..0753e60d84 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-data-quality.ts
@@ -65,6 +65,7 @@ export function useDataQuality({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts
index 0dc5737870..1aad25fde3 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datasync.ts
@@ -65,6 +65,7 @@ export function useDatasync({
          Fields.useName(from),
          ...extra,
          Fields.useRunFlag(),
+         Fields.useCache(),
          Fields.useDescription(),
          Fields.useTaskPriority(),
          Fields.useWorkerGroup(),
@@ -79,4 +80,4 @@ export function useDatasync({
      ] as IJsonItem[],
      model
  }
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
index 7dbcf04abd..0df09e14f2 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-datax.ts
@@ -59,6 +59,7 @@ export function useDataX({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts
index 05dbaa60de..d90e191967 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dinky.ts
@@ -51,6 +51,7 @@ export function useDinky({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts
index c253067f84..1c9399aec6 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dms.ts
@@ -66,6 +66,7 @@ export function useDms({
       Fields.useName(from),
       ...extra,
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
index 284c9b1014..18124ad4ec 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-dvc.ts
@@ -52,6 +52,7 @@ export function useDvc({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
index c62bb9e44f..6a4dd664ae 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr.ts
@@ -53,6 +53,7 @@ export function useEmr({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts
index d55a7b2fcb..315514f238 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink-stream.ts
@@ -61,6 +61,7 @@ export function useFlinkStream({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
index d5671cb46d..21a81046cc 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-flink.ts
@@ -61,6 +61,7 @@ export function useFlink({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts
index 9c3184bf83..f1ae3b302f 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-hive-cli.ts
@@ -65,6 +65,7 @@ export function useHiveCli({
       Fields.useName(from),
       ...extra,
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts
index 283910eab3..a1fc52757a 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-http.ts
@@ -58,6 +58,7 @@ export function useHttp({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts
index d079888a23..32b67a46a3 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-java.ts
@@ -73,6 +73,7 @@ export function useJava({
       Fields.useName(from),
       ...extra,
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
index a718320be9..dddd8fee19 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-jupyter.ts
@@ -53,6 +53,7 @@ export function useJupyter({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
index f13f7c74f0..53e8143a14 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-k8s.ts
@@ -52,6 +52,7 @@ export function useK8s({
       Fields.useName(),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
index be784c0bca..aaef1d6632 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-kubeflow.ts
@@ -51,6 +51,7 @@ export function useKubeflow({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts
index 8b0afaf93b..5bfaf265e0 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts
@@ -62,6 +62,7 @@ export function useLinkis({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts
index 268c5136fa..efb824c118 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mlflow.ts
@@ -59,6 +59,7 @@ export function useMlflow({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts
index 34e4a86f46..c0b5b79b09 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-mr.ts
@@ -52,6 +52,7 @@ export function useMr({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
index 299cd8bd61..e597828f53 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts
@@ -55,6 +55,7 @@ export function useOpenmldb({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts
index 8e03244c73..5d99554b79 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pigeon.ts
@@ -51,6 +51,7 @@ export function usePigeon({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts
index 5e9ddb31de..d45e7a3cad 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-procedure.ts
@@ -55,6 +55,7 @@ export function useProcedure({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
index 5b06937016..91db8b6efd 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-python.ts
@@ -55,6 +55,7 @@ export function usePython({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
index 302f73e758..d43e819080 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-pytorch.ts
@@ -70,6 +70,7 @@ export function usePytorch({
       Fields.useName(from),
       ...extra,
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
index 89fab36525..3bcb13e052 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sagemaker.ts
@@ -51,6 +51,7 @@ export function userSagemaker({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
index 0f184461a0..7d5e420beb 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sea-tunnel.ts
@@ -82,6 +82,7 @@ export function useSeaTunnel({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
index 3808d0d633..8026363e13 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-shell.ts
@@ -54,6 +54,7 @@ export function useShell({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
index 9a1d0aa1df..9d74dcffb3 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-spark.ts
@@ -59,6 +59,7 @@ export function useSpark({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
index 8e235ac3d0..572435bf53 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sql.ts
@@ -59,6 +59,7 @@ export function useSql({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
index cc75e37bf1..b6294be1af 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-sqoop.ts
@@ -74,6 +74,7 @@ export function useSqoop({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
index 0f6bbeeeab..a3419f683e 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-zeppelin.ts
@@ -51,6 +51,7 @@ export function useZeppelin({
       Fields.useName(from),
       ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
       Fields.useRunFlag(),
+      Fields.useCache(),
       Fields.useDescription(),
       Fields.useTaskPriority(),
       Fields.useWorkerGroup(),
diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
index 8a0057816c..282948345b 100644
--- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
+++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
@@ -434,6 +434,7 @@ interface INodeData
   cpuQuota?: number
   memoryMax?: number
   flag?: 'YES' | 'NO'
+  isCache?: boolean
   taskGroupId?: number
   taskGroupPriority?: number
   taskPriority?: string
@@ -465,10 +466,11 @@ interface INodeData
 interface ITaskData
   extends Omit<
     INodeData,
-    'timeoutFlag' | 'taskPriority' | 'timeoutNotifyStrategy'
+    'isCache' | 'timeoutFlag' | 'taskPriority' | 'timeoutNotifyStrategy'
   > {
   name?: string
   taskPriority?: string
+  isCache?: "YES" | "NO"
   timeoutFlag?: 'OPEN' | 'CLOSE'
   timeoutNotifyStrategy?: string | []
   taskParams?: ITaskParams
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx
index 87c5db3b28..be27c88ac9 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx
@@ -63,7 +63,7 @@ const props = {
 export default defineComponent({
   name: 'dag-context-menu',
   props,
-  emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask'],
+  emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask', 'removeTaskInstanceCache'],
   setup(props, ctx) {
     const graph = inject('graph', ref())
     const route = useRoute()
@@ -103,6 +103,12 @@ export default defineComponent({
       }
     }
 
+    const handleRemoveTaskInstanceCache = () => {
+      if (props.taskInstance) {
+        ctx.emit('removeTaskInstanceCache', props.taskInstance.id)
+      }
+    }
+
     const handleCopy = () => {
       const genNums = 1
       const type = props.cell?.data.taskType
@@ -138,7 +144,8 @@ export default defineComponent({
       handleViewLog,
       handleExecuteTaskOnly,
       handleExecuteTaskPOST,
-      handleExecuteTaskPRE
+      handleExecuteTaskPRE,
+      handleRemoveTaskInstanceCache
     }
   },
   render() {
@@ -181,12 +188,20 @@ export default defineComponent({
             </>
           )}
           {this.taskInstance && (
-              <NButton
-                  class={`${styles['menu-item']}`}
-                  onClick={this.handleViewLog}
-              >
-                {t('project.node.view_log')}
-              </NButton>
+              <>
+                <NButton
+                    class={`${styles['menu-item']}`}
+                    onClick={this.handleViewLog}
+                >
+                  {t('project.node.view_log')}
+                </NButton>
+                <NButton
+                    class={`${styles['menu-item']}`}
+                    onClick={this.handleRemoveTaskInstanceCache}
+                >
+                  {t('project.task.remove_task_cache')}
+                </NButton>
+              </>
           )}
           {this.executeTaskDisplay && (
               <>
diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx
index 98e388220b..45d9d907de 100644
--- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx
+++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx
@@ -56,6 +56,7 @@ import { useAsyncState } from '@vueuse/core'
 import utils from '@/utils'
 import { useUISettingStore } from '@/store/ui-setting/ui-setting'
 import { executeTask } from '@/service/modules/executors'
+import { removeTaskInstanceCache } from '@/service/modules/task-instances'
 
 const props = {
   // If this prop is passed, it means from definition detail
@@ -305,6 +306,12 @@ export default defineComponent({
         })
     }
 
+    const handleRemoveTaskInstanceCache = (taskId: number) => {
+      removeTaskInstanceCache(props.projectCode, taskId).then(() => {
+        window.$message.success(t('project.workflow.success'))
+      })
+    }
+
     const downloadLogs = () => {
       utils.downloadFile('log/download-log', {
         taskInstanceId: nodeVariables.logTaskId
@@ -418,6 +425,7 @@ export default defineComponent({
           onRemoveTasks={removeTasks}
           onViewLog={handleViewLog}
           onExecuteTask={handleExecuteTask}
+          onRemoveTaskInstanceCache={handleRemoveTaskInstanceCache}
         />
         {!!props.definition && (
           <StartModal