You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/03 15:13:19 UTC

[dolphinscheduler] branch dev updated: [Feature-6988][MasterServer] add cache manager for workflow (#7090)

This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new ba2b2a6  [Feature-6988][MasterServer] add cache manager for workflow (#7090)
ba2b2a6 is described below

commit ba2b2a67c2ad9156dc7311937fa88bbf23e60ba7
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Fri Dec 3 23:13:11 2021 +0800

    [Feature-6988][MasterServer] add cache manager for workflow (#7090)
    
    * add cache manager for workflow
    
    * [DS-6988][MasterServer] add cache manager for workflow
    
    * cache evict code optimization
    
    * test
    
    Co-authored-by: caishunfeng <53...@qq.com>
---
 .../api/aspect/CacheEvictAspect.java               | 102 ++++++++++++++++
 .../service/impl/ProcessDefinitionServiceImpl.java | 128 ++++++++++----------
 .../api/service/impl/QueueServiceImpl.java         |   8 --
 .../api/service/impl/TenantServiceImpl.java        |  12 --
 .../api/service/impl/UsersServiceImpl.java         |  14 ---
 .../api/service/QueueServiceTest.java              |  58 ++++-----
 .../api/service/TenantServiceTest.java             |  12 +-
 .../api/service/UsersServiceTest.java              |   4 -
 .../dolphinscheduler/common/enums/CacheType.java   |  19 ++-
 .../dao/datasource/SpringConnectionFactory.java    |   2 +
 .../dao/mapper/ProcessDefinitionLogMapper.java     |   5 +
 .../dao/mapper/ProcessDefinitionMapper.java        |  15 ++-
 .../dao/mapper/ProcessTaskRelationMapper.java      |  13 ++
 .../dao/mapper/TaskDefinitionLogMapper.java        |  13 ++
 .../dolphinscheduler/dao/mapper/TenantMapper.java  |  29 ++++-
 .../dolphinscheduler/dao/mapper/UserMapper.java    |  46 ++++++-
 .../server/master/processor/CacheProcessor.java    | 132 +++++++++++++++++++--
 .../master/runner/MasterSchedulerService.java      |  13 +-
 .../master/runner/WorkflowExecuteThread.java       |  28 +++--
 .../src/main/resources/application-master.yaml     |   7 +-
 .../master/processor/CacheProcessorTest.java       |  19 ++-
 .../cache/{service => }/CacheNotifyService.java    |   2 +-
 .../CacheKeyGenerator.java}                        |  20 +++-
 .../{service => }/impl/CacheNotifyServiceImpl.java |  29 +++--
 .../cache/processor/BaseCacheProcessor.java        |  22 ----
 .../cache/processor/QueueCacheProcessor.java       |  22 ----
 .../cache/processor/TenantCacheProcessor.java      |  26 ----
 .../processor/impl/CacheProcessorFactory.java      |  58 ---------
 .../processor/impl/QueueCacheProcessorImpl.java    |  50 --------
 .../processor/impl/TenantCacheProcessorImpl.java   |  64 ----------
 .../processor/impl/UserCacheProcessorImpl.java     |  59 ---------
 .../service/process/ProcessService.java            |  39 +++---
 .../service/cache/CacheNotifyServiceTest.java      |   2 +-
 .../cache/processor/QueueCacheProcessorTest.java   |  60 ----------
 .../cache/processor/TenantCacheProcessorTest.java  |  78 ------------
 .../cache/processor/UserCacheProxyTest.java        |  76 ------------
 .../service/process/ProcessServiceTest.java        |  30 +++--
 37 files changed, 543 insertions(+), 773 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
new file mode 100644
index 0000000..a989b6c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.aspect;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
+import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
+
+import java.lang.reflect.Method;
+
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.stereotype.Component;
+
+/**
+ * aspect for cache evict
+ */
+@Aspect
+@Component
+public class CacheEvictAspect {
+
+    private static final String UPDATE_BY_ID = "updateById";
+
+    @Autowired
+    private CacheKeyGenerator cacheKeyGenerator;
+
+    @Autowired
+    private CacheNotifyService cacheNotifyService;
+
+    @Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
+    public void cacheEvictPointCut() {
+        // Do nothing because of it's a pointcut
+    }
+
+    @Around("cacheEvictPointCut()")
+    public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+        MethodSignature sign = (MethodSignature) proceedingJoinPoint.getSignature();
+        Method method = sign.getMethod();
+        Object target = proceedingJoinPoint.getTarget();
+        Object[] args = proceedingJoinPoint.getArgs();
+
+        Object result = proceedingJoinPoint.proceed();
+
+        CacheConfig cacheConfig = method.getDeclaringClass().getAnnotation(CacheConfig.class);
+        CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class);
+
+        CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
+        if (cacheType != null) {
+            // todo use springEL is better
+            if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) {
+                Object updateObj = args[0];
+                cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command());
+            } else {
+                Object key = cacheKeyGenerator.generate(target, method, args);
+                cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command());
+            }
+        }
+
+        return result;
+    }
+
+    private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) {
+        String cacheName = null;
+        if (cacheEvict.cacheNames().length > 0) {
+            cacheName = cacheEvict.cacheNames()[0];
+        }
+        if (cacheConfig.cacheNames().length > 0) {
+            cacheName = cacheConfig.cacheNames()[0];
+        }
+        if (cacheName == null) {
+            return null;
+        }
+        for (CacheType cacheType : CacheType.values()) {
+            if (cacheType.getCacheName().equals(cacheName)) {
+                return cacheType;
+            }
+        }
+        return null;
+    }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e6a20b0..c447874 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -168,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * create process definition
      *
-     * @param loginUser          login user
-     * @param projectCode        project code
-     * @param name               process definition name
-     * @param description        description
-     * @param globalParams       global params
-     * @param locations          locations for nodes
-     * @param timeout            timeout
-     * @param tenantCode         tenantCode
-     * @param taskRelationJson   relation json for nodes
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param name process definition name
+     * @param description description
+     * @param globalParams global params
+     * @param locations locations for nodes
+     * @param timeout timeout
+     * @param tenantCode tenantCode
+     * @param taskRelationJson relation json for nodes
      * @param taskDefinitionJson taskDefinitionJson
      * @return create result code
      */
@@ -338,7 +338,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query process definition list
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
      * @return definition list
      */
@@ -360,7 +360,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query process definition simple list
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
      * @return definition simple list
      */
@@ -390,12 +390,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query process definition list paging
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param searchVal   search value
-     * @param userId      user id
-     * @param pageNo      page number
-     * @param pageSize    page size
+     * @param searchVal search value
+     * @param userId user id
+     * @param pageNo page number
+     * @param pageSize page size
      * @return process definition page
      */
     @Override
@@ -433,9 +433,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query detail of process definition
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param code        process definition code
+     * @param code process definition code
      * @return process definition detail
      */
     @Override
@@ -485,16 +485,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * update  process definition
      *
-     * @param loginUser          login user
-     * @param projectCode        project code
-     * @param name               process definition name
-     * @param code               process definition code
-     * @param description        description
-     * @param globalParams       global params
-     * @param locations          locations for nodes
-     * @param timeout            timeout
-     * @param tenantCode         tenantCode
-     * @param taskRelationJson   relation json for nodes
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param name process definition name
+     * @param code process definition code
+     * @param description description
+     * @param globalParams global params
+     * @param locations locations for nodes
+     * @param timeout timeout
+     * @param tenantCode tenantCode
+     * @param taskRelationJson relation json for nodes
      * @param taskDefinitionJson taskDefinitionJson
      * @return update result code
      */
@@ -605,9 +605,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * verify process definition name unique
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param name        name
+     * @param name name
      * @return true if process definition name not exists, otherwise false
      */
     @Override
@@ -630,9 +630,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * delete process definition by code
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param code        process definition code
+     * @param code process definition code
      * @return delete result code
      */
     @Override
@@ -700,9 +700,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * release process definition: online / offline
      *
-     * @param loginUser    login user
-     * @param projectCode  project code
-     * @param code         process definition code
+     * @param loginUser login user
+     * @param projectCode project code
+     * @param code process definition code
      * @param releaseState release state
      * @return release result code
      */
@@ -841,9 +841,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * import process definition
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param file        process metadata json file
+     * @param file process metadata json file
      * @return import process
      */
     @Override
@@ -1051,9 +1051,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * get task node details based on process definition
      *
-     * @param loginUser   loginUser
+     * @param loginUser loginUser
      * @param projectCode project code
-     * @param code        process definition code
+     * @param code process definition code
      * @return task node list
      */
     @Override
@@ -1080,9 +1080,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * get task node details map based on process definition
      *
-     * @param loginUser   loginUser
+     * @param loginUser loginUser
      * @param projectCode project code
-     * @param codes       define codes
+     * @param codes define codes
      * @return task node list
      */
     @Override
@@ -1124,7 +1124,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query process definition all by project code
      *
-     * @param loginUser   loginUser
+     * @param loginUser loginUser
      * @param projectCode project code
      * @return process definitions in the project
      */
@@ -1226,11 +1226,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
                         if (taskInstance.isSubProcess()) {
                             TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
                             subProcessCode = Integer.parseInt(JSONUtils.parseObject(
-                                taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
+                                    taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
                         }
                         treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
-                            taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
-                            taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
+                                taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
+                                taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
                     }
                 }
                 for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@@ -1291,9 +1291,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * batch copy process definition
      *
-     * @param loginUser         loginUser
-     * @param projectCode       projectCode
-     * @param codes             processDefinitionCodes
+     * @param loginUser loginUser
+     * @param projectCode projectCode
+     * @param codes processDefinitionCodes
      * @param targetProjectCode targetProjectCode
      */
     @Override
@@ -1314,9 +1314,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * batch move process definition
      *
-     * @param loginUser         loginUser
-     * @param projectCode       projectCode
-     * @param codes             processDefinitionCodes
+     * @param loginUser loginUser
+     * @param projectCode projectCode
+     * @param codes processDefinitionCodes
      * @param targetProjectCode targetProjectCode
      */
     @Override
@@ -1415,10 +1415,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * switch the defined process definition version
      *
-     * @param loginUser   login user
+     * @param loginUser login user
      * @param projectCode project code
-     * @param code        process definition code
-     * @param version     the version user want to switch
+     * @param code process definition code
+     * @param version the version user want to switch
      * @return switch process definition version result code
      */
     @Override
@@ -1454,11 +1454,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * check batch operate result
      *
-     * @param srcProjectCode    srcProjectCode
+     * @param srcProjectCode srcProjectCode
      * @param targetProjectCode targetProjectCode
-     * @param result            result
+     * @param result result
      * @param failedProcessList failedProcessList
-     * @param isCopy            isCopy
+     * @param isCopy isCopy
      */
     private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
                                          Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@@ -1476,11 +1476,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * query the pagination versions info by one certain process definition code
      *
-     * @param loginUser   login user info to check auth
+     * @param loginUser login user info to check auth
      * @param projectCode project code
-     * @param pageNo      page number
-     * @param pageSize    page size
-     * @param code        process definition code
+     * @param pageNo page number
+     * @param pageSize page size
+     * @param code process definition code
      * @return the pagination process definition versions info of the certain process definition
      */
     @Override
@@ -1510,10 +1510,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
     /**
      * delete one certain process definition by version number and process definition code
      *
-     * @param loginUser   login user info to check auth
+     * @param loginUser login user info to check auth
      * @param projectCode project code
-     * @param code        process definition code
-     * @param version     version number
+     * @param code process definition code
+     * @param version version number
      * @return delete result code
      */
     @Override
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index 14c57cc..2da89df 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -22,13 +22,10 @@ import org.apache.dolphinscheduler.api.service.QueueService;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 
 import org.apache.commons.lang.StringUtils;
 
@@ -59,9 +56,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
     @Autowired
     private UserMapper userMapper;
 
-    @Autowired
-    private CacheNotifyService cacheNotifyService;
-
     /**
      * query queue list
      *
@@ -229,8 +223,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
 
         queueMapper.updateById(queueObj);
 
-        cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.QUEUE, queueObj).convert2Command());
-
         putMsg(result, Status.SUCCESS);
 
         return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 58f6bbc..0601725 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.RegexUtils;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
@@ -70,9 +67,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
     @Autowired
     private UserMapper userMapper;
 
-    @Autowired
-    private CacheNotifyService cacheNotifyService;
-
     /**
      * create tenant
      *
@@ -220,9 +214,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
         tenant.setUpdateTime(now);
         tenantMapper.updateById(tenant);
 
-        // notify master to expire cache
-        cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
-
         result.put(Constants.STATUS, Status.SUCCESS);
         result.put(Constants.MSG, Status.SUCCESS.getMsg());
         return result;
@@ -282,9 +273,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
         tenantMapper.deleteById(id);
         processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
 
-        // notify master to expire cache
-        cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
-
         putMsg(result, Status.SUCCESS);
         return result;
     }
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index ff5dc7d..5199d4f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
 import org.apache.dolphinscheduler.api.utils.PageInfo;
 import org.apache.dolphinscheduler.api.utils.Result;
 import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.UserType;
 import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
@@ -53,8 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -121,10 +118,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
     @Autowired
     private ProjectMapper projectMapper;
 
-    @Autowired
-    private CacheNotifyService cacheNotifyService;
-
-
     /**
      * create user, only system admin have permission
      *
@@ -479,7 +472,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
 
         // updateProcessInstance user
         userMapper.updateById(user);
-        cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
 
         putMsg(result, Status.SUCCESS);
         return result;
@@ -531,10 +523,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
 
         userMapper.deleteById(id);
 
-        if (user != null) {
-            cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
-        }
-
         putMsg(result, Status.SUCCESS);
 
         return result;
@@ -1079,8 +1067,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
         user.setUpdateTime(now);
         userMapper.updateById(user);
 
-        cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
-
         User responseUser = userMapper.queryByUserNameAccurately(userName);
         putMsg(result, Status.SUCCESS);
         result.put(Constants.DATA_LIST, responseUser);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index 7f1a2ed..f3167a5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.Queue;
 import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -62,9 +61,6 @@ public class QueueServiceTest {
     private QueueServiceImpl queueService;
 
     @Mock
-    private CacheNotifyService cacheNotifyService;
-
-    @Mock
     private QueueMapper queueMapper;
 
     @Mock
@@ -77,7 +73,7 @@ public class QueueServiceTest {
     }
 
     @After
-    public void after(){
+    public void after() {
     }
 
     @Test
@@ -86,7 +82,7 @@ public class QueueServiceTest {
         Mockito.when(queueMapper.selectList(null)).thenReturn(getQueueList());
         Map<String, Object> result = queueService.queryList(getLoginUser());
         logger.info(result.toString());
-        List<Queue> queueList  = (List<Queue>) result.get(Constants.DATA_LIST);
+        List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
         Assert.assertTrue(CollectionUtils.isNotEmpty(queueList));
 
     }
@@ -94,13 +90,13 @@ public class QueueServiceTest {
     @Test
     public void testQueryListPage() {
 
-        IPage<Queue> page = new Page<>(1,10);
+        IPage<Queue> page = new Page<>(1, 10);
         page.setTotal(1L);
         page.setRecords(getQueueList());
         Mockito.when(queueMapper.queryQueuePaging(Mockito.any(Page.class), Mockito.eq(queueName))).thenReturn(page);
-        Result result = queueService.queryList(getLoginUser(),queueName,1,10);
+        Result result = queueService.queryList(getLoginUser(), queueName, 1, 10);
         logger.info(result.toString());
-        PageInfo<Queue>  pageInfo = (PageInfo<Queue>) result.getData();
+        PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
         Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList()));
     }
 
@@ -108,17 +104,17 @@ public class QueueServiceTest {
     public void testCreateQueue() {
 
         // queue is null
-        Map<String, Object> result = queueService.createQueue(getLoginUser(),null,queueName);
+        Map<String, Object> result = queueService.createQueue(getLoginUser(), null, queueName);
         logger.info(result.toString());
-        Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+        Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
         // queueName is null
-        result = queueService.createQueue(getLoginUser(),queueName,null);
+        result = queueService.createQueue(getLoginUser(), queueName, null);
         logger.info(result.toString());
-        Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+        Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
         // correct
-        result = queueService.createQueue(getLoginUser(),queueName,queueName);
+        result = queueService.createQueue(getLoginUser(), queueName, queueName);
         logger.info(result.toString());
-        Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
+        Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
 
     }
 
@@ -130,25 +126,25 @@ public class QueueServiceTest {
         Mockito.when(queueMapper.existQueue(null, "test")).thenReturn(true);
 
         // not exist
-        Map<String, Object> result = queueService.updateQueue(getLoginUser(),0,"queue",queueName);
+        Map<String, Object> result = queueService.updateQueue(getLoginUser(), 0, "queue", queueName);
         logger.info(result.toString());
-        Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+        Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
         //no need update
-        result = queueService.updateQueue(getLoginUser(),1,queueName,queueName);
+        result = queueService.updateQueue(getLoginUser(), 1, queueName, queueName);
         logger.info(result.toString());
-        Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+        Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
         //queue exist
-        result = queueService.updateQueue(getLoginUser(),1,"test",queueName);
+        result = queueService.updateQueue(getLoginUser(), 1, "test", queueName);
         logger.info(result.toString());
-        Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+        Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
         // queueName exist
-        result = queueService.updateQueue(getLoginUser(),1,"test1","test");
+        result = queueService.updateQueue(getLoginUser(), 1, "test1", "test");
         logger.info(result.toString());
-        Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+        Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
         //success
-        result = queueService.updateQueue(getLoginUser(),1,"test1","test1");
+        result = queueService.updateQueue(getLoginUser(), 1, "test1", "test1");
         logger.info(result.toString());
-        Assert.assertEquals(Status.SUCCESS.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+        Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
 
     }
 
@@ -159,27 +155,27 @@ public class QueueServiceTest {
         Mockito.when(queueMapper.existQueue(null, queueName)).thenReturn(true);
 
         //queue null
-        Result result = queueService.verifyQueue(null,queueName);
+        Result result = queueService.verifyQueue(null, queueName);
         logger.info(result.toString());
         Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
 
         //queueName null
-        result = queueService.verifyQueue(queueName,null);
+        result = queueService.verifyQueue(queueName, null);
         logger.info(result.toString());
         Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
 
         //exist queueName
-        result = queueService.verifyQueue(queueName,queueName);
+        result = queueService.verifyQueue(queueName, queueName);
         logger.info(result.toString());
         Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_NAME_EXIST.getCode());
 
         //exist queue
-        result = queueService.verifyQueue(queueName,"test");
+        result = queueService.verifyQueue(queueName, "test");
         logger.info(result.toString());
         Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_VALUE_EXIST.getCode());
 
         // success
-        result = queueService.verifyQueue("test","test");
+        result = queueService.verifyQueue("test", "test");
         logger.info(result.toString());
         Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
 
@@ -187,7 +183,6 @@ public class QueueServiceTest {
 
     /**
      * create admin user
-     * @return
      */
     private User getLoginUser() {
 
@@ -205,7 +200,6 @@ public class QueueServiceTest {
 
     /**
      * get queue
-     * @return
      */
     private Queue getQueue() {
         Queue queue = new Queue();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index d9e1e97..e1c00d2 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 
 import org.apache.commons.collections.CollectionUtils;
 
@@ -64,9 +63,6 @@ public class TenantServiceTest {
     private TenantServiceImpl tenantService;
 
     @Mock
-    private CacheNotifyService cacheNotifyService;
-
-    @Mock
     private TenantMapper tenantMapper;
 
     @Mock
@@ -88,7 +84,7 @@ public class TenantServiceTest {
         try {
             //check tenantCode
             Map<String, Object> result =
-                tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
+                    tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
             logger.info(result.toString());
             Assert.assertEquals(Status.CHECK_OS_TENANT_CODE_ERROR, result.get(Constants.STATUS));
 
@@ -116,7 +112,7 @@ public class TenantServiceTest {
         page.setRecords(getList());
         page.setTotal(1L);
         Mockito.when(tenantMapper.queryTenantPaging(Mockito.any(Page.class), Mockito.eq("TenantServiceTest")))
-            .thenReturn(page);
+                .thenReturn(page);
         Result result = tenantService.queryTenantList(getLoginUser(), "TenantServiceTest", 1, 10);
         logger.info(result.toString());
         PageInfo<Tenant> pageInfo = (PageInfo<Tenant>) result.getData();
@@ -131,7 +127,7 @@ public class TenantServiceTest {
         try {
             // id not exist
             Map<String, Object> result =
-                tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
+                    tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
             logger.info(result.toString());
             // success
             Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS));
@@ -150,7 +146,7 @@ public class TenantServiceTest {
 
         Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
         Mockito.when(processInstanceMapper.queryByTenantIdAndStatus(1, Constants.NOT_TERMINATED_STATES))
-            .thenReturn(getInstanceList());
+                .thenReturn(getInstanceList());
         Mockito.when(processDefinitionMapper.queryDefinitionListByTenant(2)).thenReturn(getDefinitionsList());
         Mockito.when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList());
 
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index 01db35b..e586db8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
 import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
 import org.apache.commons.collections.CollectionUtils;
@@ -80,9 +79,6 @@ public class UsersServiceTest {
     private UsersServiceImpl usersService;
 
     @Mock
-    private CacheNotifyService cacheNotifyService;
-
-    @Mock
     private UserMapper userMapper;
 
     @Mock
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
index db31eea..f1921db 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
@@ -18,7 +18,20 @@
 package org.apache.dolphinscheduler.common.enums;
 
 public enum CacheType {
-    TENANT,
-    USER,
-    QUEUE;
+    TENANT("tenant"),
+    USER("user"),
+    QUEUE("queue"),
+    PROCESS_DEFINITION("processDefinition"),
+    PROCESS_TASK_RELATION("processTaskRelation"),
+    TASK_DEFINITION("taskDefinition");
+
+    CacheType(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    private final String cacheName;
+
+    public String getCacheName() {
+        return cacheName;
+    }
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index 5381495..2b4f99c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -42,6 +42,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
 
 @Configuration
 public class SpringConnectionFactory {
+
     @Bean
     public PaginationInterceptor paginationInterceptor() {
         return new PaginationInterceptor();
@@ -60,6 +61,7 @@ public class SpringConnectionFactory {
         configuration.setCallSettersOnNulls(true);
         configuration.setJdbcTypeForNull(JdbcType.NULL);
         configuration.addInterceptor(paginationInterceptor());
+
         configuration.setGlobalConfig(new GlobalConfig().setBanner(false));
         MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
         sqlSessionFactoryBean.setConfiguration(configuration);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 51ccb73..0e03b50 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -23,6 +23,9 @@ import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -30,6 +33,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 /**
  * process definition log mapper interface
  */
+@CacheConfig(cacheNames = "processDefinition")
 public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
 
     /**
@@ -66,6 +70,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
      * @param version version number
      * @return the process definition version info
      */
+    @Cacheable(sync = true, key = "#processDefinitionCode + '_' + #version")
     ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,  @Param("version") int version);
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 3a731ea..ca287e2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
 import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
 
 import org.apache.ibatis.annotations.MapKey;
 import org.apache.ibatis.annotations.Param;
@@ -28,13 +27,17 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 
 /**
  * process definition mapper interface
  */
+@CacheConfig(cacheNames = "processDefinition", keyGenerator = "cacheKeyGenerator")
 public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
 
     /**
@@ -43,9 +46,16 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
      * @param code code
      * @return process definition
      */
+    @Cacheable(sync = true)
     ProcessDefinition queryByCode(@Param("code") long code);
 
     /**
+     * update
+     */
+    @CacheEvict
+    int updateById(@Param("et") ProcessDefinition processDefinition);
+
+    /**
      * query process definition by code list
      *
      * @param codes codes
@@ -59,6 +69,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
      * @param code code
      * @return delete result
      */
+    @CacheEvict
     int deleteByCode(@Param("code") long code);
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 5b74f46..5b3ea75 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -25,11 +25,16 @@ import org.apache.ibatis.annotations.Param;
 import java.util.List;
 import java.util.Map;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 
 /**
  * process task relation mapper interface
  */
+@CacheConfig(cacheNames = "processTaskRelation", keyGenerator = "cacheKeyGenerator")
 public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> {
 
     /**
@@ -39,10 +44,17 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
      * @param processCode processCode
      * @return ProcessTaskRelation list
      */
+    @Cacheable(sync = true)
     List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
                                                  @Param("processCode") long processCode);
 
     /**
+     * update
+     */
+    @CacheEvict
+    int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
+
+    /**
      * process task relation by taskCode
      *
      * @param taskCodes taskCode list
@@ -65,6 +77,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
      * @param processCode processCode
      * @return int
      */
+    @CacheEvict
     int deleteByCode(@Param("projectCode") long projectCode,
                      @Param("processCode") long processCode);
 
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index ab2620f..75e1b29 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -25,6 +25,10 @@ import org.apache.ibatis.annotations.Param;
 import java.util.Collection;
 import java.util.List;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -32,6 +36,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 /**
  * task definition log mapper interface
  */
+@CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator")
 public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
 
     /**
@@ -48,10 +53,17 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
      * @param version version
      * @return task definition log
      */
+    @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
     TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
                                                       @Param("version") int version);
 
     /**
+     * update
+     */
+    @CacheEvict
+    int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
+
+    /**
      * @param taskDefinitions taskDefinition list
      * @return list
      */
@@ -72,6 +84,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
      * @param version task definition version
      * @return delete result
      */
+    @CacheEvict
     int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
 
     /**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
index 89b6237..843122f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
@@ -14,29 +14,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.Tenant;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
+
 import org.apache.ibatis.annotations.Param;
 
-import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
 
 /**
  * tenant mapper interface
  */
+@CacheConfig(cacheNames = "tenant", keyGenerator = "cacheKeyGenerator")
 public interface TenantMapper extends BaseMapper<Tenant> {
 
     /**
      * query tenant by id
+     *
      * @param tenantId tenantId
      * @return tenant
      */
+    @Cacheable(sync = true)
     Tenant queryById(@Param("tenantId") int tenantId);
 
     /**
+     * delete by id
+     */
+    @CacheEvict
+    int deleteById(int id);
+
+    /**
+     * update
+     */
+    @CacheEvict
+    int updateById(@Param("et") Tenant tenant);
+
+    /**
      * query tenant by code
+     *
      * @param tenantCode tenantCode
      * @return tenant
      */
@@ -44,6 +65,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
 
     /**
      * tenant page
+     *
      * @param page page
      * @param searchVal searchVal
      * @return tenant IPage
@@ -53,6 +75,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
 
     /**
      * check tenant exist
+     *
      * @param tenantCode tenantCode
      * @return true if exist else return null
      */
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
index 4418363..20fafdc 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
@@ -14,29 +14,57 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.User;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
 /**
  * user mapper interface
  */
+@CacheConfig(cacheNames = "user", keyGenerator = "cacheKeyGenerator")
 public interface UserMapper extends BaseMapper<User> {
 
     /**
+     * select by user id
+     */
+    @Cacheable(sync = true)
+    User selectById(int id);
+
+    /**
+     * delete by id
+     */
+    @CacheEvict
+    int deleteById(int id);
+
+    /**
+     * update
+     */
+    @CacheEvict
+    int updateById(@Param("et") User user);
+
+    /**
      * query all general user
+     *
      * @return user list
      */
     List<User> queryAllGeneralUser();
 
     /**
      * query user by name
+     *
      * @param userName userName
      * @return user
      */
@@ -44,6 +72,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user by userName and password
+     *
      * @param userName userName
      * @param password password
      * @return user
@@ -53,6 +82,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * user page
+     *
      * @param page page
      * @param userName userName
      * @return user IPage
@@ -62,6 +92,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user detail by id
+     *
      * @param userId userId
      * @return user
      */
@@ -69,6 +100,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user list by alertgroupId
+     *
      * @param alertgroupId alertgroupId
      * @return user list
      */
@@ -76,6 +108,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user list by tenantId
+     *
      * @param tenantId tenantId
      * @return user list
      */
@@ -83,6 +116,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user by userId
+     *
      * @param userId userId
      * @return user
      */
@@ -90,6 +124,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user by token
+     *
      * @param token token
      * @return user
      */
@@ -97,6 +132,7 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * query user by queue name
+     *
      * @param queueName queue name
      * @return user list
      */
@@ -104,13 +140,15 @@ public interface UserMapper extends BaseMapper<User> {
 
     /**
      * check the user exist
-     * @param queueName queue name
+     *
+     * @param queue queue name
      * @return true if exist else return null
      */
     Boolean existUser(@Param("queue") String queue);
 
     /**
      * update user with old queue
+     *
      * @param oldQueue old queue name
      * @param newQueue new queue name
      * @return update rows
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 64571b8..778d1ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -17,16 +17,24 @@
 
 package org.apache.dolphinscheduler.server.master.processor;
 
+import org.apache.dolphinscheduler.common.enums.CacheType;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 
 import com.google.common.base.Preconditions;
 
@@ -39,11 +47,7 @@ public class CacheProcessor implements NettyRequestProcessor {
 
     private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
 
-    private CacheProcessorFactory cacheProcessorFactory;
-
-    public CacheProcessor() {
-        this.cacheProcessorFactory = SpringApplicationContext.getBean(CacheProcessorFactory.class);
-    }
+    private CacheManager cacheManager;
 
     @Override
     public void process(Channel channel, Command command) {
@@ -53,6 +57,120 @@ public class CacheProcessor implements NettyRequestProcessor {
 
         logger.info("received command : {}", cacheExpireCommand);
 
-        cacheProcessorFactory.getCacheProcessor(cacheExpireCommand.getCacheType()).cacheExpire(cacheExpireCommand.getUpdateObjClass(), cacheExpireCommand.getUpdateObjJson());
+        this.cacheExpire(cacheExpireCommand);
+    }
+
+    private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
+        if (cacheManager == null) {
+            cacheManager = SpringApplicationContext.getBean(CacheManager.class);
+        }
+
+        Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass());
+        if (object == null) {
+            return;
+        }
+
+        CacheType cacheType = cacheExpireCommand.getCacheType();
+        switch (cacheType) {
+            case TENANT:
+                if (object instanceof Tenant) {
+                    Tenant tenant = (Tenant) object;
+                    tenantCacheExpire(tenant);
+                }
+                break;
+            case USER:
+                if (object instanceof User) {
+                    User user = (User) object;
+                    userCacheExpire(user);
+                }
+                break;
+            case QUEUE:
+                if (object instanceof Queue) {
+                    Queue queue = (Queue) object;
+                    queueCacheExpire(queue);
+                }
+                break;
+            case PROCESS_DEFINITION:
+                if (object instanceof ProcessDefinition) {
+                    ProcessDefinition processDefinition = (ProcessDefinition) object;
+                    processDefinitionCacheExpire(processDefinition);
+                }
+                break;
+            case TASK_DEFINITION:
+                if (object instanceof TaskDefinition) {
+                    TaskDefinition taskDefinition = (TaskDefinition) object;
+                    taskDefinitionCacheExpire(taskDefinition);
+                }
+                break;
+            case PROCESS_TASK_RELATION:
+                if (object instanceof ProcessTaskRelation) {
+                    ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object;
+                    processTaskRelationCacheExpire(processTaskRelation);
+                }
+                break;
+            default:
+                logger.error("no support cache type:{}", cacheType);
+        }
+
+        // if delete operation, just send key
+        if (object instanceof String) {
+            Cache cache = cacheManager.getCache(cacheType.getCacheName());
+            if (cache != null) {
+                cache.evict(object);
+                logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object);
+            }
+        }
+    }
+
+    private void tenantCacheExpire(Tenant tenant) {
+        Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
+        if (cache != null) {
+            cache.evict(tenant.getId());
+            logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId());
+        }
+    }
+
+    private void userCacheExpire(User user) {
+        Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
+        if (cache != null) {
+            cache.evict(user.getId());
+            logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId());
+        }
+    }
+
+    private void queueCacheExpire(Queue queue) {
+        Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
+        if (cache != null) {
+            cache.clear();
+            logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName());
+        }
+    }
+
+    private void processDefinitionCacheExpire(ProcessDefinition processDefinition) {
+        Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
+        if (cache != null) {
+            cache.evict(processDefinition.getCode());
+            cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion());
+            logger.info("cache evict, type:{}, key:{}",
+                    CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion());
+        }
+    }
+
+    private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) {
+        Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
+        if (cache != null) {
+            cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
+            logger.info("cache evict, type:{}, key:{}",
+                    CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
+        }
+    }
+
+    private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
+        Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
+        if (cache != null) {
+            cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion());
+            logger.info("cache evict, type:{}, key:{}",
+                    CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion());
+        }
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 2b8287b..6e05da9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -118,12 +118,6 @@ public class MasterSchedulerService extends Thread {
      */
     ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
 
-    /**
-     * key:code-version
-     * value: processDefinition
-     */
-    HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
-
     private StateWheelExecuteThread stateWheelExecuteThread;
 
     /**
@@ -195,10 +189,6 @@ public class MasterSchedulerService extends Thread {
             return;
         }
 
-        if (!masterConfig.isCacheProcessDefinition() && processDefinitionCacheMaps.size() > 0) {
-            processDefinitionCacheMaps.clear();
-        }
-
         List<ProcessInstance> processInstances = command2ProcessInstance(commands);
         if (CollectionUtils.isEmpty(processInstances)) {
             return;
@@ -245,8 +235,7 @@ public class MasterSchedulerService extends Thread {
                 try {
                     ProcessInstance processInstance = processService.handleCommand(logger,
                             getLocalAddress(),
-                            command,
-                            processDefinitionCacheMaps);
+                            command);
                     if (processInstance != null) {
                         processInstances[index] = processInstance;
                         logger.info("handle command command {} end, create process instance {}",
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 059c371..abc8872 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -737,19 +737,21 @@ public class WorkflowExecuteThread implements Runnable {
         completeTaskMap.clear();
         errorTaskMap.clear();
 
-        List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
-        for (TaskInstance task : validTaskInstanceList) {
-            validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
-            taskInstanceMap.put(task.getId(), task);
-
-            if (task.isTaskComplete()) {
-                completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
-            }
-            if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
-                continue;
-            }
-            if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
-                errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+        if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
+            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+            for (TaskInstance task : validTaskInstanceList) {
+                validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                taskInstanceMap.put(task.getId(), task);
+
+                if (task.isTaskComplete()) {
+                    completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                }
+                if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
+                    continue;
+                }
+                if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
+                    errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+                }
             }
         }
 
diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml b/dolphinscheduler-server/src/main/resources/application-master.yaml
index 86796c1..866b200 100644
--- a/dolphinscheduler-server/src/main/resources/application-master.yaml
+++ b/dolphinscheduler-server/src/main/resources/application-master.yaml
@@ -19,12 +19,15 @@ spring:
     name: master-server
   cache:
     # default enable cache, you can disable by `type: none`
-    type: caffeine
+    type: none
     cache-names:
       - tenant
       - user
+      - processDefinition
+      - processTaskRelation
+      - taskDefinition
     caffeine:
-      spec: maximumSize=100,expireAfterWrite=60s,recordStats
+      spec: maximumSize=100,expireAfterWrite=300s,recordStats
 
 master:
   listen-port: 5678
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 02f3216..6f1907c 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -22,19 +22,17 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
 import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
-import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
 
 import io.netty.channel.Channel;
 
@@ -47,21 +45,20 @@ public class CacheProcessorTest {
 
     private CacheProcessor cacheProcessor;
 
-    @InjectMocks
-    private TenantCacheProcessorImpl tenantCacheProcessor;
-
     @Mock
     private Channel channel;
 
     @Mock
-    private CacheProcessorFactory cacheProcessorFactory;
+    private CacheManager cacheManager;
+
+    @Mock
+    private Cache cache;
 
     @Before
     public void before() {
         PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
-        PowerMockito.when(SpringApplicationContext.getBean(CacheProcessorFactory.class)).thenReturn(cacheProcessorFactory);
-        Mockito.when(cacheProcessorFactory.getCacheProcessor(CacheType.TENANT)).thenReturn(tenantCacheProcessor);
+        PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
+        Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
         cacheProcessor = new CacheProcessor();
     }
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
similarity index 94%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
index 3b051b6..09c5571 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.cache.service;
+package org.apache.dolphinscheduler.service.cache;
 
 import org.apache.dolphinscheduler.remote.command.Command;
 
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
similarity index 61%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
index 1d93b42..2a03654 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
@@ -15,12 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.cache.processor;
+package org.apache.dolphinscheduler.service.cache.impl;
 
-import org.apache.dolphinscheduler.dao.entity.User;
+import java.lang.reflect.Method;
 
-public interface UserCacheProcessor extends BaseCacheProcessor {
-    void update(int userId);
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
 
-    User selectById(int userId);
+/**
+ * custom cache key generator
+ */
+@Component
+public class CacheKeyGenerator implements KeyGenerator {
+
+    @Override
+    public Object generate(Object target, Method method, Object... params) {
+        return StringUtils.arrayToDelimitedString(params, "_");
+    }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
similarity index 82%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
index 1aa15c2..ffa9299 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.service.cache.service.impl;
+package org.apache.dolphinscheduler.service.cache.impl;
 
 import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.model.Server;
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
 import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
 import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -114,19 +114,22 @@ public class CacheNotifyServiceImpl implements CacheNotifyService {
     @Override
     public void notifyMaster(Command command) {
         logger.info("send result, command:{}", command.toString());
+        try {
+            List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
+            if (CollectionUtils.isEmpty(serverList)) {
+                return;
+            }
 
-        List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
-        if (CollectionUtils.isEmpty(serverList)) {
-            return;
-        }
-
-        for (Server server : serverList) {
-            Host host = new Host(server.getHost(), server.getPort());
-            NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
-            if (nettyRemoteChannel == null) {
-                continue;
+            for (Server server : serverList) {
+                Host host = new Host(server.getHost(), server.getPort());
+                NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
+                if (nettyRemoteChannel == null) {
+                    continue;
+                }
+                nettyRemoteChannel.writeAndFlush(command);
             }
-            nettyRemoteChannel.writeAndFlush(command);
+        } catch (Exception e) {
+            logger.error("notify master error", e);
         }
     }
 }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java
deleted file mode 100644
index 2b0dc8b..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-public interface BaseCacheProcessor {
-    void cacheExpire(Class updateObjClass, String updateObjJson);
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java
deleted file mode 100644
index 4b438eb..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-public interface QueueCacheProcessor extends BaseCacheProcessor {
-    public void expireAllUserCache();
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java
deleted file mode 100644
index c6b8000..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-
-public interface TenantCacheProcessor extends BaseCacheProcessor {
-    void update(int tenantId);
-
-    Tenant queryById(int tenantId);
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java
deleted file mode 100644
index 16a6ccd..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.enums.CacheType;
-import org.apache.dolphinscheduler.service.cache.processor.BaseCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CacheProcessorFactory {
-
-    @Autowired
-    private TenantCacheProcessor tenantCacheProcessor;
-
-    @Autowired
-    private UserCacheProcessor userCacheProcessor;
-
-    @Autowired
-    private QueueCacheProcessor queueCacheProcessor;
-
-    Map<CacheType, BaseCacheProcessor> cacheProcessorMap = new ConcurrentHashMap<>();
-
-    @PostConstruct
-    private void init() {
-        cacheProcessorMap.put(CacheType.TENANT, tenantCacheProcessor);
-        cacheProcessorMap.put(CacheType.USER, userCacheProcessor);
-        cacheProcessorMap.put(CacheType.QUEUE, queueCacheProcessor);
-    }
-
-    public BaseCacheProcessor getCacheProcessor(CacheType cacheType) {
-        return cacheProcessorMap.get(cacheType);
-    }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java
deleted file mode 100644
index 174d59f..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.stereotype.Component;
-
-@Component
-public class QueueCacheProcessorImpl implements QueueCacheProcessor {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Override
-    @CacheEvict(cacheNames = "user", allEntries = true)
-    public void expireAllUserCache() {
-        // just evict cache
-        logger.debug("expire all user cache");
-    }
-
-    @Override
-    public void cacheExpire(Class updateObjClass, String updateObjJson) {
-        Queue updateQueue = (Queue) JSONUtils.parseObject(updateObjJson, updateObjClass);
-        if (updateQueue == null) {
-            return;
-        }
-        SpringApplicationContext.getBean(QueueCacheProcessor.class).expireAllUserCache();
-    }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java
deleted file mode 100644
index 3ca8014..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheConfig;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Cacheable;
-import org.springframework.stereotype.Component;
-
-@Component
-@CacheConfig(cacheNames = "tenant")
-public class TenantCacheProcessorImpl implements TenantCacheProcessor {
-
-    private final Logger logger = LoggerFactory.getLogger(getClass());
-
-    @Autowired
-    private TenantMapper tenantMapper;
-
-    @Override
-    @CacheEvict
-    public void update(int tenantId) {
-        // just evict cache
-    }
-
-    @Override
-    @Cacheable(sync = true)
-    public Tenant queryById(int tenantId) {
-        logger.debug("tenant cache proxy, tenantId:{}", tenantId);
-        return tenantMapper.queryById(tenantId);
-    }
-
-    @Override
-    public void cacheExpire(Class updateObjClass, String updateObjJson) {
-        Tenant updateTenant = (Tenant) JSONUtils.parseObject(updateObjJson, updateObjClass);
-        if (updateTenant == null) {
-            return;
-        }
-        SpringApplicationContext.getBean(TenantCacheProcessor.class).update(updateTenant.getId());
-    }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java
deleted file mode 100644
index fb25fb5..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheConfig;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Cacheable;
-import org.springframework.stereotype.Component;
-
-@Component
-@CacheConfig(cacheNames = "user")
-public class UserCacheProcessorImpl implements UserCacheProcessor {
-
-    @Autowired
-    private UserMapper userMapper;
-
-    @Override
-    @CacheEvict
-    public void update(int userId) {
-        // just evict cache
-    }
-
-    @Override
-    @Cacheable(sync = true)
-    public User selectById(int userId) {
-        return userMapper.selectById(userId);
-    }
-
-    @Override
-    public void cacheExpire(Class updateObjClass, String updateObjJson) {
-        User user = (User) JSONUtils.parseObject(updateObjJson, updateObjClass);
-        if (user == null) {
-            return;
-        }
-        SpringApplicationContext.getBean(UserCacheProcessor.class).update(user.getId());
-    }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 9143154..9992002 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,15 +100,15 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
 import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.dao.utils.DagHelper;
 import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
 import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
 import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
 import org.apache.dolphinscheduler.remote.utils.Host;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.log.LogClientService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -155,7 +155,7 @@ public class ProcessService {
             ExecutionStatus.READY_STOP.ordinal()};
 
     @Autowired
-    private UserCacheProcessor userCacheProcessor;
+    private UserMapper userMapper;
 
     @Autowired
     private ProcessDefinitionMapper processDefineMapper;
@@ -194,7 +194,7 @@ public class ProcessService {
     private ErrorCommandMapper errorCommandMapper;
 
     @Autowired
-    private TenantCacheProcessor tenantCacheProcessor;
+    private TenantMapper tenantMapper;
 
     @Autowired
     private ProjectMapper projectMapper;
@@ -233,8 +233,8 @@ public class ProcessService {
      * @return process instance
      */
     @Transactional
-    public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
-        ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
+    public ProcessInstance handleCommand(Logger logger, String host, Command command) {
+        ProcessInstance processInstance = constructProcessInstance(command, host);
         // cannot construct process instance, return null
         if (processInstance == null) {
             logger.error("scan command, command parameter is error: {}", command);
@@ -739,7 +739,7 @@ public class ProcessService {
     public Tenant getTenantForProcess(int tenantId, int userId) {
         Tenant tenant = null;
         if (tenantId >= 0) {
-            tenant = tenantCacheProcessor.queryById(tenantId);
+            tenant = tenantMapper.queryById(tenantId);
         }
 
         if (userId == 0) {
@@ -747,8 +747,8 @@ public class ProcessService {
         }
 
         if (tenant == null) {
-            User user = userCacheProcessor.selectById(userId);
-            tenant = tenantCacheProcessor.queryById(user.getTenantId());
+            User user = userMapper.selectById(userId);
+            tenant = tenantMapper.queryById(user.getTenantId());
         }
         return tenant;
     }
@@ -794,19 +794,12 @@ public class ProcessService {
      * @param host host
      * @return process instance
      */
-    private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
+    private ProcessInstance constructProcessInstance(Command command, String host) {
         ProcessInstance processInstance;
         ProcessDefinition processDefinition;
         CommandType commandType = command.getCommandType();
-        String key = String.format("%d-%d", command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
-        if (processDefinitionCacheMaps.containsKey(key)) {
-            processDefinition = processDefinitionCacheMaps.get(key);
-        } else {
-            processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
-            if (processDefinition != null) {
-                processDefinitionCacheMaps.put(key, processDefinition);
-            }
-        }
+
+        processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
         if (processDefinition == null) {
             logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
             return null;
@@ -1964,11 +1957,11 @@ public class ProcessService {
             return StringUtils.EMPTY;
         }
         int userId = resourceList.get(0).getUserId();
-        User user = userCacheProcessor.selectById(userId);
+        User user = userMapper.selectById(userId);
         if (Objects.isNull(user)) {
             return StringUtils.EMPTY;
         }
-        Tenant tenant = tenantCacheProcessor.queryById(user.getTenantId());
+        Tenant tenant = tenantMapper.queryById(user.getTenantId());
         if (Objects.isNull(tenant)) {
             return StringUtils.EMPTY;
         }
@@ -2038,7 +2031,7 @@ public class ProcessService {
         if (processInstance == null) {
             return queue;
         }
-        User executor = userCacheProcessor.selectById(processInstance.getExecutorId());
+        User executor = userMapper.selectById(processInstance.getExecutorId());
         if (executor != null) {
             queue = executor.getQueue();
         }
@@ -2149,7 +2142,7 @@ public class ProcessService {
      * @return User
      */
     public User getUserById(int userId) {
-        return userCacheProcessor.selectById(userId);
+        return userMapper.selectById(userId);
     }
 
     /**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
index 83f1f00..04d1f78 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.service.cache.service.impl.CacheNotifyServiceImpl;
+import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.ArrayList;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java
deleted file mode 100644
index ee61564..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.QueueCacheProcessorImpl;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class QueueCacheProcessorTest {
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    @InjectMocks
-    private QueueCacheProcessorImpl queueCacheProcessor;
-
-    @Before
-    public void before() {
-        PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(QueueCacheProcessor.class)).thenReturn(queueCacheProcessor);
-    }
-
-    @Test
-    public void testCacheExpire() {
-        Queue queue = new Queue();
-        queue.setId(100);
-        queueCacheProcessor.cacheExpire(Queue.class, JSONUtils.toJsonString(queue));
-    }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java
deleted file mode 100644
index 4ecc970..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class TenantCacheProcessorTest {
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    @InjectMocks
-    private TenantCacheProcessorImpl tenantCacheProcessor;
-
-    @Mock
-    private TenantMapper tenantMapper;
-
-    @Before
-    public void before() {
-        PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
-    }
-
-    @Test
-    public void testQueryById() {
-        Tenant tenant1 = new Tenant();
-        tenant1.setId(100);
-        tenant1.setDescription("test1");
-
-        Mockito.when(tenantMapper.queryById(100)).thenReturn(tenant1);
-        Assert.assertEquals(tenant1, tenantCacheProcessor.queryById(100));
-    }
-
-    @Test
-    public void testCacheExpire() {
-        Tenant tenant1 = new Tenant();
-        tenant1.setId(100);
-        tenant1.setDescription("test1");
-        tenantCacheProcessor.cacheExpire(Tenant.class, JSONUtils.toJsonString(tenant1));
-    }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java
deleted file mode 100644
index f786f43..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class UserCacheProxyTest {
-
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
-    @InjectMocks
-    private UserCacheProcessorImpl userCacheProcessor;
-
-    @Mock
-    private UserMapper userMapper;
-
-    @Before
-    public void before() {
-        PowerMockito.mockStatic(SpringApplicationContext.class);
-        PowerMockito.when(SpringApplicationContext.getBean(UserCacheProcessor.class)).thenReturn(userCacheProcessor);
-    }
-
-    @Test
-    public void testQueryById() {
-        User user1 = new User();
-        user1.setId(100);
-
-        Mockito.when(userMapper.selectById(100)).thenReturn(user1);
-        Assert.assertEquals(user1, userCacheProcessor.selectById(100));
-    }
-
-    @Test
-    public void testCacheExpire() {
-        User user = new User();
-        user.setId(100);
-        userCacheProcessor.cacheExpire(User.class, JSONUtils.toJsonString(user));
-    }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 2117b86..510e3a0 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -64,7 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.exceptions.ServiceException;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
 
@@ -116,7 +116,7 @@ public class ProcessServiceTest {
     @Mock
     private ProcessInstanceMapper processInstanceMapper;
     @Mock
-    private UserCacheProcessorImpl userCacheProcessor;
+    private UserMapper userMapper;
     @Mock
     private TaskInstanceMapper taskInstanceMapper;
     @Mock
@@ -134,8 +134,6 @@ public class ProcessServiceTest {
     @Mock
     private TaskGroupQueueMapper taskGroupQueueMapper;
 
-    private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
-
     @Test
     public void testCreateSubCommand() {
         ProcessInstance parentInstance = new ProcessInstance();
@@ -263,7 +261,7 @@ public class ProcessServiceTest {
         command.setCommandType(CommandType.REPEAT_RUNNING);
         command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
                 + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
-        Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
+        Assert.assertNull(processService.handleCommand(logger, host, command));
 
         int definitionVersion = 1;
         long definitionCode = 123;
@@ -298,7 +296,7 @@ public class ProcessServiceTest {
         Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
                 processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
+        Assert.assertNotNull(processService.handleCommand(logger, host, command1));
 
         Command command2 = new Command();
         command2.setId(2);
@@ -308,7 +306,7 @@ public class ProcessServiceTest {
         command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
         command2.setProcessInstanceId(processInstanceId);
         Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
+        Assert.assertNotNull(processService.handleCommand(logger, host, command2));
 
         Command command3 = new Command();
         command3.setId(3);
@@ -318,7 +316,7 @@ public class ProcessServiceTest {
         command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
         command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
         Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
+        Assert.assertNotNull(processService.handleCommand(logger, host, command3));
 
         Command command4 = new Command();
         command4.setId(4);
@@ -328,7 +326,7 @@ public class ProcessServiceTest {
         command4.setCommandType(CommandType.REPEAT_RUNNING);
         command4.setProcessInstanceId(processInstanceId);
         Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
+        Assert.assertNotNull(processService.handleCommand(logger, host, command4));
 
         Command command5 = new Command();
         command5.setId(5);
@@ -342,7 +340,7 @@ public class ProcessServiceTest {
         command5.setCommandType(CommandType.START_PROCESS);
         command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
         Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
-        ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
+        ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
         Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
 
         ProcessDefinition processDefinition1 = new ProcessDefinition();
@@ -367,7 +365,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
         Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
         Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
-        Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
+        Assert.assertNotNull(processService.handleCommand(logger, host, command1));
 
         Command command6 = new Command();
         command6.setId(6);
@@ -378,7 +376,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
         Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
-        ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps);
+        ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
         Assert.assertTrue(processInstance6 != null);
 
         processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@@ -397,7 +395,7 @@ public class ProcessServiceTest {
         command7.setProcessDefinitionVersion(1);
         Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
-        ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps);
+        ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
         Assert.assertTrue(processInstance8 == null);
 
         ProcessDefinition processDefinition2 = new ProcessDefinition();
@@ -421,7 +419,7 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
         Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
         Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
-        ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps);
+        ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
         Assert.assertTrue(processInstance10 == null);
     }
 
@@ -462,14 +460,14 @@ public class ProcessServiceTest {
         Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
 
         // will throw exception when command id is 0 and delete fail
-        processService.handleCommand(logger, host, command1, processDefinitionCacheMaps);
+        processService.handleCommand(logger, host, command1);
     }
 
     @Test
     public void testGetUserById() {
         User user = new User();
         user.setId(123);
-        Mockito.when(userCacheProcessor.selectById(123)).thenReturn(user);
+        Mockito.when(userMapper.selectById(123)).thenReturn(user);
         Assert.assertEquals(user, processService.getUserById(123));
     }