You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by le...@apache.org on 2021/12/03 15:13:19 UTC
[dolphinscheduler] branch dev updated: [Feature-6988][MasterServer] add cache manager for workflow (#7090)
This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ba2b2a6 [Feature-6988][MasterServer] add cache manager for workflow (#7090)
ba2b2a6 is described below
commit ba2b2a67c2ad9156dc7311937fa88bbf23e60ba7
Author: wind <ca...@users.noreply.github.com>
AuthorDate: Fri Dec 3 23:13:11 2021 +0800
[Feature-6988][MasterServer] add cache manager for workflow (#7090)
* add cache manager for workflow
* [DS-6988][MasterServer] add cache manager for workflow
* cache evict code optimization
* test
Co-authored-by: caishunfeng <53...@qq.com>
---
.../api/aspect/CacheEvictAspect.java | 102 ++++++++++++++++
.../service/impl/ProcessDefinitionServiceImpl.java | 128 ++++++++++----------
.../api/service/impl/QueueServiceImpl.java | 8 --
.../api/service/impl/TenantServiceImpl.java | 12 --
.../api/service/impl/UsersServiceImpl.java | 14 ---
.../api/service/QueueServiceTest.java | 58 ++++-----
.../api/service/TenantServiceTest.java | 12 +-
.../api/service/UsersServiceTest.java | 4 -
.../dolphinscheduler/common/enums/CacheType.java | 19 ++-
.../dao/datasource/SpringConnectionFactory.java | 2 +
.../dao/mapper/ProcessDefinitionLogMapper.java | 5 +
.../dao/mapper/ProcessDefinitionMapper.java | 15 ++-
.../dao/mapper/ProcessTaskRelationMapper.java | 13 ++
.../dao/mapper/TaskDefinitionLogMapper.java | 13 ++
.../dolphinscheduler/dao/mapper/TenantMapper.java | 29 ++++-
.../dolphinscheduler/dao/mapper/UserMapper.java | 46 ++++++-
.../server/master/processor/CacheProcessor.java | 132 +++++++++++++++++++--
.../master/runner/MasterSchedulerService.java | 13 +-
.../master/runner/WorkflowExecuteThread.java | 28 +++--
.../src/main/resources/application-master.yaml | 7 +-
.../master/processor/CacheProcessorTest.java | 19 ++-
.../cache/{service => }/CacheNotifyService.java | 2 +-
.../CacheKeyGenerator.java} | 20 +++-
.../{service => }/impl/CacheNotifyServiceImpl.java | 29 +++--
.../cache/processor/BaseCacheProcessor.java | 22 ----
.../cache/processor/QueueCacheProcessor.java | 22 ----
.../cache/processor/TenantCacheProcessor.java | 26 ----
.../processor/impl/CacheProcessorFactory.java | 58 ---------
.../processor/impl/QueueCacheProcessorImpl.java | 50 --------
.../processor/impl/TenantCacheProcessorImpl.java | 64 ----------
.../processor/impl/UserCacheProcessorImpl.java | 59 ---------
.../service/process/ProcessService.java | 39 +++---
.../service/cache/CacheNotifyServiceTest.java | 2 +-
.../cache/processor/QueueCacheProcessorTest.java | 60 ----------
.../cache/processor/TenantCacheProcessorTest.java | 78 ------------
.../cache/processor/UserCacheProxyTest.java | 76 ------------
.../service/process/ProcessServiceTest.java | 30 +++--
37 files changed, 543 insertions(+), 773 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
new file mode 100644
index 0000000..a989b6c
--- /dev/null
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.api.aspect;
+
+import org.apache.dolphinscheduler.common.enums.CacheType;
+import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
+import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
+
+import java.lang.reflect.Method;
+
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.stereotype.Component;
+
+/**
+ * aspect for cache evict
+ */
+@Aspect
+@Component
+public class CacheEvictAspect {
+
+ private static final String UPDATE_BY_ID = "updateById";
+
+ @Autowired
+ private CacheKeyGenerator cacheKeyGenerator;
+
+ @Autowired
+ private CacheNotifyService cacheNotifyService;
+
+ @Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
+ public void cacheEvictPointCut() {
+ // Do nothing because of it's a pointcut
+ }
+
+ @Around("cacheEvictPointCut()")
+ public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+ MethodSignature sign = (MethodSignature) proceedingJoinPoint.getSignature();
+ Method method = sign.getMethod();
+ Object target = proceedingJoinPoint.getTarget();
+ Object[] args = proceedingJoinPoint.getArgs();
+
+ Object result = proceedingJoinPoint.proceed();
+
+ CacheConfig cacheConfig = method.getDeclaringClass().getAnnotation(CacheConfig.class);
+ CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class);
+
+ CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
+ if (cacheType != null) {
+ // todo use springEL is better
+ if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) {
+ Object updateObj = args[0];
+ cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command());
+ } else {
+ Object key = cacheKeyGenerator.generate(target, method, args);
+ cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command());
+ }
+ }
+
+ return result;
+ }
+
+ private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) {
+ String cacheName = null;
+ if (cacheEvict.cacheNames().length > 0) {
+ cacheName = cacheEvict.cacheNames()[0];
+ }
+ if (cacheConfig.cacheNames().length > 0) {
+ cacheName = cacheConfig.cacheNames()[0];
+ }
+ if (cacheName == null) {
+ return null;
+ }
+ for (CacheType cacheType : CacheType.values()) {
+ if (cacheType.getCacheName().equals(cacheName)) {
+ return cacheType;
+ }
+ }
+ return null;
+ }
+}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index e6a20b0..c447874 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -168,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create process definition
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param description description
- * @param globalParams global params
- * @param locations locations for nodes
- * @param timeout timeout
- * @param tenantCode tenantCode
- * @param taskRelationJson relation json for nodes
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param description description
+ * @param globalParams global params
+ * @param locations locations for nodes
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@@ -338,7 +338,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
* @return definition list
*/
@@ -360,7 +360,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition simple list
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
* @return definition simple list
*/
@@ -390,12 +390,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list paging
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param searchVal search value
- * @param userId user id
- * @param pageNo page number
- * @param pageSize page size
+ * @param searchVal search value
+ * @param userId user id
+ * @param pageNo page number
+ * @param pageSize page size
* @return process definition page
*/
@Override
@@ -433,9 +433,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query detail of process definition
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return process definition detail
*/
@Override
@@ -485,16 +485,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition
*
- * @param loginUser login user
- * @param projectCode project code
- * @param name process definition name
- * @param code process definition code
- * @param description description
- * @param globalParams global params
- * @param locations locations for nodes
- * @param timeout timeout
- * @param tenantCode tenantCode
- * @param taskRelationJson relation json for nodes
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param name process definition name
+ * @param code process definition code
+ * @param description description
+ * @param globalParams global params
+ * @param locations locations for nodes
+ * @param timeout timeout
+ * @param tenantCode tenantCode
+ * @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@@ -605,9 +605,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param name name
+ * @param name name
* @return true if process definition name not exists, otherwise false
*/
@Override
@@ -630,9 +630,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete process definition by code
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return delete result code
*/
@Override
@@ -700,9 +700,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* release process definition: online / offline
*
- * @param loginUser login user
- * @param projectCode project code
- * @param code process definition code
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param code process definition code
* @param releaseState release state
* @return release result code
*/
@@ -841,9 +841,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* import process definition
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param file process metadata json file
+ * @param file process metadata json file
* @return import process
*/
@Override
@@ -1051,9 +1051,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
- * @param code process definition code
+ * @param code process definition code
* @return task node list
*/
@Override
@@ -1080,9 +1080,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details map based on process definition
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
- * @param codes define codes
+ * @param codes define codes
* @return task node list
*/
@Override
@@ -1124,7 +1124,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition all by project code
*
- * @param loginUser loginUser
+ * @param loginUser loginUser
* @param projectCode project code
* @return process definitions in the project
*/
@@ -1226,11 +1226,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessCode = Integer.parseInt(JSONUtils.parseObject(
- taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
+ taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
- taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
- taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
+ taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
+ taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@@ -1291,9 +1291,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch copy process definition
*
- * @param loginUser loginUser
- * @param projectCode projectCode
- * @param codes processDefinitionCodes
+ * @param loginUser loginUser
+ * @param projectCode projectCode
+ * @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@@ -1314,9 +1314,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
*
- * @param loginUser loginUser
- * @param projectCode projectCode
- * @param codes processDefinitionCodes
+ * @param loginUser loginUser
+ * @param projectCode projectCode
+ * @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@@ -1415,10 +1415,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* switch the defined process definition version
*
- * @param loginUser login user
+ * @param loginUser login user
* @param projectCode project code
- * @param code process definition code
- * @param version the version user want to switch
+ * @param code process definition code
+ * @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
@@ -1454,11 +1454,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check batch operate result
*
- * @param srcProjectCode srcProjectCode
+ * @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
- * @param result result
+ * @param result result
* @param failedProcessList failedProcessList
- * @param isCopy isCopy
+ * @param isCopy isCopy
*/
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@@ -1476,11 +1476,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition code
*
- * @param loginUser login user info to check auth
+ * @param loginUser login user info to check auth
* @param projectCode project code
- * @param pageNo page number
- * @param pageSize page size
- * @param code process definition code
+ * @param pageNo page number
+ * @param pageSize page size
+ * @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
@@ -1510,10 +1510,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process definition code
*
- * @param loginUser login user info to check auth
+ * @param loginUser login user info to check auth
* @param projectCode project code
- * @param code process definition code
- * @param version version number
+ * @param code process definition code
+ * @param version version number
* @return delete result code
*/
@Override
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index 14c57cc..2da89df 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -22,13 +22,10 @@ import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.lang.StringUtils;
@@ -59,9 +56,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
@Autowired
private UserMapper userMapper;
- @Autowired
- private CacheNotifyService cacheNotifyService;
-
/**
* query queue list
*
@@ -229,8 +223,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
queueMapper.updateById(queueObj);
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.QUEUE, queueObj).convert2Command());
-
putMsg(result, Status.SUCCESS);
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 58f6bbc..0601725 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@@ -70,9 +67,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
@Autowired
private UserMapper userMapper;
- @Autowired
- private CacheNotifyService cacheNotifyService;
-
/**
* create tenant
*
@@ -220,9 +214,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
tenant.setUpdateTime(now);
tenantMapper.updateById(tenant);
- // notify master to expire cache
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
-
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
@@ -282,9 +273,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
tenantMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
- // notify master to expire cache
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
-
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index ff5dc7d..5199d4f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
-import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
@@ -53,8 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
-import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
@@ -121,10 +118,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
@Autowired
private ProjectMapper projectMapper;
- @Autowired
- private CacheNotifyService cacheNotifyService;
-
-
/**
* create user, only system admin have permission
*
@@ -479,7 +472,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// updateProcessInstance user
userMapper.updateById(user);
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
putMsg(result, Status.SUCCESS);
return result;
@@ -531,10 +523,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
userMapper.deleteById(id);
- if (user != null) {
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
- }
-
putMsg(result, Status.SUCCESS);
return result;
@@ -1079,8 +1067,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
user.setUpdateTime(now);
userMapper.updateById(user);
- cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
-
User responseUser = userMapper.queryByUserNameAccurately(userName);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, responseUser);
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
index 7f1a2ed..f3167a5 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
@@ -62,9 +61,6 @@ public class QueueServiceTest {
private QueueServiceImpl queueService;
@Mock
- private CacheNotifyService cacheNotifyService;
-
- @Mock
private QueueMapper queueMapper;
@Mock
@@ -77,7 +73,7 @@ public class QueueServiceTest {
}
@After
- public void after(){
+ public void after() {
}
@Test
@@ -86,7 +82,7 @@ public class QueueServiceTest {
Mockito.when(queueMapper.selectList(null)).thenReturn(getQueueList());
Map<String, Object> result = queueService.queryList(getLoginUser());
logger.info(result.toString());
- List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
+ List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(queueList));
}
@@ -94,13 +90,13 @@ public class QueueServiceTest {
@Test
public void testQueryListPage() {
- IPage<Queue> page = new Page<>(1,10);
+ IPage<Queue> page = new Page<>(1, 10);
page.setTotal(1L);
page.setRecords(getQueueList());
Mockito.when(queueMapper.queryQueuePaging(Mockito.any(Page.class), Mockito.eq(queueName))).thenReturn(page);
- Result result = queueService.queryList(getLoginUser(),queueName,1,10);
+ Result result = queueService.queryList(getLoginUser(), queueName, 1, 10);
logger.info(result.toString());
- PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
+ PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList()));
}
@@ -108,17 +104,17 @@ public class QueueServiceTest {
public void testCreateQueue() {
// queue is null
- Map<String, Object> result = queueService.createQueue(getLoginUser(),null,queueName);
+ Map<String, Object> result = queueService.createQueue(getLoginUser(), null, queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// queueName is null
- result = queueService.createQueue(getLoginUser(),queueName,null);
+ result = queueService.createQueue(getLoginUser(), queueName, null);
logger.info(result.toString());
- Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// correct
- result = queueService.createQueue(getLoginUser(),queueName,queueName);
+ result = queueService.createQueue(getLoginUser(), queueName, queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
+ Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@@ -130,25 +126,25 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, "test")).thenReturn(true);
// not exist
- Map<String, Object> result = queueService.updateQueue(getLoginUser(),0,"queue",queueName);
+ Map<String, Object> result = queueService.updateQueue(getLoginUser(), 0, "queue", queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//no need update
- result = queueService.updateQueue(getLoginUser(),1,queueName,queueName);
+ result = queueService.updateQueue(getLoginUser(), 1, queueName, queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//queue exist
- result = queueService.updateQueue(getLoginUser(),1,"test",queueName);
+ result = queueService.updateQueue(getLoginUser(), 1, "test", queueName);
logger.info(result.toString());
- Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
// queueName exist
- result = queueService.updateQueue(getLoginUser(),1,"test1","test");
+ result = queueService.updateQueue(getLoginUser(), 1, "test1", "test");
logger.info(result.toString());
- Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//success
- result = queueService.updateQueue(getLoginUser(),1,"test1","test1");
+ result = queueService.updateQueue(getLoginUser(), 1, "test1", "test1");
logger.info(result.toString());
- Assert.assertEquals(Status.SUCCESS.getCode(),((Status)result.get(Constants.STATUS)).getCode());
+ Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
}
@@ -159,27 +155,27 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, queueName)).thenReturn(true);
//queue null
- Result result = queueService.verifyQueue(null,queueName);
+ Result result = queueService.verifyQueue(null, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//queueName null
- result = queueService.verifyQueue(queueName,null);
+ result = queueService.verifyQueue(queueName, null);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//exist queueName
- result = queueService.verifyQueue(queueName,queueName);
+ result = queueService.verifyQueue(queueName, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_NAME_EXIST.getCode());
//exist queue
- result = queueService.verifyQueue(queueName,"test");
+ result = queueService.verifyQueue(queueName, "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_VALUE_EXIST.getCode());
// success
- result = queueService.verifyQueue("test","test");
+ result = queueService.verifyQueue("test", "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
@@ -187,7 +183,6 @@ public class QueueServiceTest {
/**
* create admin user
- * @return
*/
private User getLoginUser() {
@@ -205,7 +200,6 @@ public class QueueServiceTest {
/**
* get queue
- * @return
*/
private Queue getQueue() {
Queue queue = new Queue();
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index d9e1e97..e1c00d2 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
@@ -64,9 +63,6 @@ public class TenantServiceTest {
private TenantServiceImpl tenantService;
@Mock
- private CacheNotifyService cacheNotifyService;
-
- @Mock
private TenantMapper tenantMapper;
@Mock
@@ -88,7 +84,7 @@ public class TenantServiceTest {
try {
//check tenantCode
Map<String, Object> result =
- tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
+ tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
logger.info(result.toString());
Assert.assertEquals(Status.CHECK_OS_TENANT_CODE_ERROR, result.get(Constants.STATUS));
@@ -116,7 +112,7 @@ public class TenantServiceTest {
page.setRecords(getList());
page.setTotal(1L);
Mockito.when(tenantMapper.queryTenantPaging(Mockito.any(Page.class), Mockito.eq("TenantServiceTest")))
- .thenReturn(page);
+ .thenReturn(page);
Result result = tenantService.queryTenantList(getLoginUser(), "TenantServiceTest", 1, 10);
logger.info(result.toString());
PageInfo<Tenant> pageInfo = (PageInfo<Tenant>) result.getData();
@@ -131,7 +127,7 @@ public class TenantServiceTest {
try {
// id not exist
Map<String, Object> result =
- tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
+ tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
logger.info(result.toString());
// success
Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS));
@@ -150,7 +146,7 @@ public class TenantServiceTest {
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(processInstanceMapper.queryByTenantIdAndStatus(1, Constants.NOT_TERMINATED_STATES))
- .thenReturn(getInstanceList());
+ .thenReturn(getInstanceList());
Mockito.when(processDefinitionMapper.queryDefinitionListByTenant(2)).thenReturn(getDefinitionsList());
Mockito.when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList());
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index 01db35b..e586db8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
@@ -80,9 +79,6 @@ public class UsersServiceTest {
private UsersServiceImpl usersService;
@Mock
- private CacheNotifyService cacheNotifyService;
-
- @Mock
private UserMapper userMapper;
@Mock
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
index db31eea..f1921db 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
@@ -18,7 +18,20 @@
package org.apache.dolphinscheduler.common.enums;
public enum CacheType {
- TENANT,
- USER,
- QUEUE;
+ TENANT("tenant"),
+ USER("user"),
+ QUEUE("queue"),
+ PROCESS_DEFINITION("processDefinition"),
+ PROCESS_TASK_RELATION("processTaskRelation"),
+ TASK_DEFINITION("taskDefinition");
+
+ CacheType(String cacheName) {
+ this.cacheName = cacheName;
+ }
+
+ private final String cacheName;
+
+ public String getCacheName() {
+ return cacheName;
+ }
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index 5381495..2b4f99c 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -42,6 +42,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
@Configuration
public class SpringConnectionFactory {
+
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
@@ -60,6 +61,7 @@ public class SpringConnectionFactory {
configuration.setCallSettersOnNulls(true);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.addInterceptor(paginationInterceptor());
+
configuration.setGlobalConfig(new GlobalConfig().setBanner(false));
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 51ccb73..0e03b50 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -23,6 +23,9 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -30,6 +33,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition log mapper interface
*/
+@CacheConfig(cacheNames = "processDefinition")
public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
/**
@@ -66,6 +70,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* @param version version number
* @return the process definition version info
*/
+ @Cacheable(sync = true, key = "#processDefinitionCode + '_' + #version")
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
index 3a731ea..ca287e2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
-import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
@@ -28,13 +27,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition mapper interface
*/
+@CacheConfig(cacheNames = "processDefinition", keyGenerator = "cacheKeyGenerator")
public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/**
@@ -43,9 +46,16 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param code code
* @return process definition
*/
+ @Cacheable(sync = true)
ProcessDefinition queryByCode(@Param("code") long code);
/**
+ * update
+ */
+ @CacheEvict
+ int updateById(@Param("et") ProcessDefinition processDefinition);
+
+ /**
* query process definition by code list
*
* @param codes codes
@@ -59,6 +69,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param code code
* @return delete result
*/
+ @CacheEvict
int deleteByCode(@Param("code") long code);
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
index 5b74f46..5b3ea75 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
@@ -25,11 +25,16 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process task relation mapper interface
*/
+@CacheConfig(cacheNames = "processTaskRelation", keyGenerator = "cacheKeyGenerator")
public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> {
/**
@@ -39,10 +44,17 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return ProcessTaskRelation list
*/
+ @Cacheable(sync = true)
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
/**
+ * update
+ */
+ @CacheEvict
+ int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
+
+ /**
* process task relation by taskCode
*
* @param taskCodes taskCode list
@@ -65,6 +77,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return int
*/
+ @CacheEvict
int deleteByCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index ab2620f..75e1b29 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -25,6 +25,10 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@@ -32,6 +36,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task definition log mapper interface
*/
+@CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator")
public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
/**
@@ -48,10 +53,17 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version version
* @return task definition log
*/
+ @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
@Param("version") int version);
/**
+ * update
+ */
+ @CacheEvict
+ int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
+
+ /**
* @param taskDefinitions taskDefinition list
* @return list
*/
@@ -72,6 +84,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version task definition version
* @return delete result
*/
+ @CacheEvict
int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
index 89b6237..843122f 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
@@ -14,29 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Tenant;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
+
import org.apache.ibatis.annotations.Param;
-import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* tenant mapper interface
*/
+@CacheConfig(cacheNames = "tenant", keyGenerator = "cacheKeyGenerator")
public interface TenantMapper extends BaseMapper<Tenant> {
/**
* query tenant by id
+ *
* @param tenantId tenantId
* @return tenant
*/
+ @Cacheable(sync = true)
Tenant queryById(@Param("tenantId") int tenantId);
/**
+ * delete by id
+ */
+ @CacheEvict
+ int deleteById(int id);
+
+ /**
+ * update
+ */
+ @CacheEvict
+ int updateById(@Param("et") Tenant tenant);
+
+ /**
* query tenant by code
+ *
* @param tenantCode tenantCode
* @return tenant
*/
@@ -44,6 +65,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* tenant page
+ *
* @param page page
* @param searchVal searchVal
* @return tenant IPage
@@ -53,6 +75,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* check tenant exist
+ *
* @param tenantCode tenantCode
* @return true if exist else return null
*/
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
index 4418363..20fafdc 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
@@ -14,29 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.User;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-import com.baomidou.mybatisplus.core.metadata.IPage;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
import org.apache.ibatis.annotations.Param;
import java.util.List;
+import org.springframework.cache.annotation.CacheConfig;
+import org.springframework.cache.annotation.CacheEvict;
+import org.springframework.cache.annotation.Cacheable;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+
/**
* user mapper interface
*/
+@CacheConfig(cacheNames = "user", keyGenerator = "cacheKeyGenerator")
public interface UserMapper extends BaseMapper<User> {
/**
+ * select by user id
+ */
+ @Cacheable(sync = true)
+ User selectById(int id);
+
+ /**
+ * delete by id
+ */
+ @CacheEvict
+ int deleteById(int id);
+
+ /**
+ * update
+ */
+ @CacheEvict
+ int updateById(@Param("et") User user);
+
+ /**
* query all general user
+ *
* @return user list
*/
List<User> queryAllGeneralUser();
/**
* query user by name
+ *
* @param userName userName
* @return user
*/
@@ -44,6 +72,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userName and password
+ *
* @param userName userName
* @param password password
* @return user
@@ -53,6 +82,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* user page
+ *
* @param page page
* @param userName userName
* @return user IPage
@@ -62,6 +92,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user detail by id
+ *
* @param userId userId
* @return user
*/
@@ -69,6 +100,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by alertgroupId
+ *
* @param alertgroupId alertgroupId
* @return user list
*/
@@ -76,6 +108,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by tenantId
+ *
* @param tenantId tenantId
* @return user list
*/
@@ -83,6 +116,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userId
+ *
* @param userId userId
* @return user
*/
@@ -90,6 +124,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by token
+ *
* @param token token
* @return user
*/
@@ -97,6 +132,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by queue name
+ *
* @param queueName queue name
* @return user list
*/
@@ -104,13 +140,15 @@ public interface UserMapper extends BaseMapper<User> {
/**
* check the user exist
- * @param queueName queue name
+ *
+ * @param queue queue name
* @return true if exist else return null
*/
Boolean existUser(@Param("queue") String queue);
/**
* update user with old queue
+ *
* @param oldQueue old queue name
* @param newQueue new queue name
* @return update rows
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
index 64571b8..778d1ae 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
@@ -17,16 +17,24 @@
package org.apache.dolphinscheduler.server.master.processor;
+import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
+import org.apache.dolphinscheduler.dao.entity.Queue;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
import com.google.common.base.Preconditions;
@@ -39,11 +47,7 @@ public class CacheProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
- private CacheProcessorFactory cacheProcessorFactory;
-
- public CacheProcessor() {
- this.cacheProcessorFactory = SpringApplicationContext.getBean(CacheProcessorFactory.class);
- }
+ private CacheManager cacheManager;
@Override
public void process(Channel channel, Command command) {
@@ -53,6 +57,120 @@ public class CacheProcessor implements NettyRequestProcessor {
logger.info("received command : {}", cacheExpireCommand);
- cacheProcessorFactory.getCacheProcessor(cacheExpireCommand.getCacheType()).cacheExpire(cacheExpireCommand.getUpdateObjClass(), cacheExpireCommand.getUpdateObjJson());
+ this.cacheExpire(cacheExpireCommand);
+ }
+
+ private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
+ if (cacheManager == null) {
+ cacheManager = SpringApplicationContext.getBean(CacheManager.class);
+ }
+
+ Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass());
+ if (object == null) {
+ return;
+ }
+
+ CacheType cacheType = cacheExpireCommand.getCacheType();
+ switch (cacheType) {
+ case TENANT:
+ if (object instanceof Tenant) {
+ Tenant tenant = (Tenant) object;
+ tenantCacheExpire(tenant);
+ }
+ break;
+ case USER:
+ if (object instanceof User) {
+ User user = (User) object;
+ userCacheExpire(user);
+ }
+ break;
+ case QUEUE:
+ if (object instanceof Queue) {
+ Queue queue = (Queue) object;
+ queueCacheExpire(queue);
+ }
+ break;
+ case PROCESS_DEFINITION:
+ if (object instanceof ProcessDefinition) {
+ ProcessDefinition processDefinition = (ProcessDefinition) object;
+ processDefinitionCacheExpire(processDefinition);
+ }
+ break;
+ case TASK_DEFINITION:
+ if (object instanceof TaskDefinition) {
+ TaskDefinition taskDefinition = (TaskDefinition) object;
+ taskDefinitionCacheExpire(taskDefinition);
+ }
+ break;
+ case PROCESS_TASK_RELATION:
+ if (object instanceof ProcessTaskRelation) {
+ ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object;
+ processTaskRelationCacheExpire(processTaskRelation);
+ }
+ break;
+ default:
+ logger.error("no support cache type:{}", cacheType);
+ }
+
+ // if delete operation, just send key
+ if (object instanceof String) {
+ Cache cache = cacheManager.getCache(cacheType.getCacheName());
+ if (cache != null) {
+ cache.evict(object);
+ logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object);
+ }
+ }
+ }
+
+ private void tenantCacheExpire(Tenant tenant) {
+ Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
+ if (cache != null) {
+ cache.evict(tenant.getId());
+ logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId());
+ }
+ }
+
+ private void userCacheExpire(User user) {
+ Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
+ if (cache != null) {
+ cache.evict(user.getId());
+ logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId());
+ }
+ }
+
+ private void queueCacheExpire(Queue queue) {
+ Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
+ if (cache != null) {
+ cache.clear();
+ logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName());
+ }
+ }
+
+ private void processDefinitionCacheExpire(ProcessDefinition processDefinition) {
+ Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
+ if (cache != null) {
+ cache.evict(processDefinition.getCode());
+ cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion());
+ logger.info("cache evict, type:{}, key:{}",
+ CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion());
+ }
+ }
+
+ private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) {
+ Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
+ if (cache != null) {
+ cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
+ logger.info("cache evict, type:{}, key:{}",
+ CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
+ }
+ }
+
+ private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
+ Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
+ if (cache != null) {
+ cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion());
+ logger.info("cache evict, type:{}, key:{}",
+ CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion());
+ }
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index 2b8287b..6e05da9 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -118,12 +118,6 @@ public class MasterSchedulerService extends Thread {
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
- /**
- * key:code-version
- * value: processDefinition
- */
- HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
-
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@@ -195,10 +189,6 @@ public class MasterSchedulerService extends Thread {
return;
}
- if (!masterConfig.isCacheProcessDefinition() && processDefinitionCacheMaps.size() > 0) {
- processDefinitionCacheMaps.clear();
- }
-
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
return;
@@ -245,8 +235,7 @@ public class MasterSchedulerService extends Thread {
try {
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
- command,
- processDefinitionCacheMaps);
+ command);
if (processInstance != null) {
processInstances[index] = processInstance;
logger.info("handle command command {} end, create process instance {}",
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 059c371..abc8872 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -737,19 +737,21 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.clear();
errorTaskMap.clear();
- List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
- for (TaskInstance task : validTaskInstanceList) {
- validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
- taskInstanceMap.put(task.getId(), task);
-
- if (task.isTaskComplete()) {
- completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
- }
- if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
- continue;
- }
- if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
- errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
+ List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
+ for (TaskInstance task : validTaskInstanceList) {
+ validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ taskInstanceMap.put(task.getId(), task);
+
+ if (task.isTaskComplete()) {
+ completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ }
+ if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
+ continue;
+ }
+ if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
+ errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
+ }
}
}
diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml b/dolphinscheduler-server/src/main/resources/application-master.yaml
index 86796c1..866b200 100644
--- a/dolphinscheduler-server/src/main/resources/application-master.yaml
+++ b/dolphinscheduler-server/src/main/resources/application-master.yaml
@@ -19,12 +19,15 @@ spring:
name: master-server
cache:
# default enable cache, you can disable by `type: none`
- type: caffeine
+ type: none
cache-names:
- tenant
- user
+ - processDefinition
+ - processTaskRelation
+ - taskDefinition
caffeine:
- spec: maximumSize=100,expireAfterWrite=60s,recordStats
+ spec: maximumSize=100,expireAfterWrite=300s,recordStats
master:
listen-port: 5678
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
index 02f3216..6f1907c 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
@@ -22,19 +22,17 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
-import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.cache.Cache;
+import org.springframework.cache.CacheManager;
import io.netty.channel.Channel;
@@ -47,21 +45,20 @@ public class CacheProcessorTest {
private CacheProcessor cacheProcessor;
- @InjectMocks
- private TenantCacheProcessorImpl tenantCacheProcessor;
-
@Mock
private Channel channel;
@Mock
- private CacheProcessorFactory cacheProcessorFactory;
+ private CacheManager cacheManager;
+
+ @Mock
+ private Cache cache;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
- PowerMockito.when(SpringApplicationContext.getBean(CacheProcessorFactory.class)).thenReturn(cacheProcessorFactory);
- Mockito.when(cacheProcessorFactory.getCacheProcessor(CacheType.TENANT)).thenReturn(tenantCacheProcessor);
+ PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
+ Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
cacheProcessor = new CacheProcessor();
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
similarity index 94%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
index 3b051b6..09c5571 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.cache.service;
+package org.apache.dolphinscheduler.service.cache;
import org.apache.dolphinscheduler.remote.command.Command;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
similarity index 61%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
index 1d93b42..2a03654 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
@@ -15,12 +15,22 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.cache.processor;
+package org.apache.dolphinscheduler.service.cache.impl;
-import org.apache.dolphinscheduler.dao.entity.User;
+import java.lang.reflect.Method;
-public interface UserCacheProcessor extends BaseCacheProcessor {
- void update(int userId);
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
- User selectById(int userId);
+/**
+ * custom cache key generator
+ */
+@Component
+public class CacheKeyGenerator implements KeyGenerator {
+
+ @Override
+ public Object generate(Object target, Method method, Object... params) {
+ return StringUtils.arrayToDelimitedString(params, "_");
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
similarity index 82%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
index 1aa15c2..ffa9299 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.service.cache.service.impl;
+package org.apache.dolphinscheduler.service.cache.impl;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
+import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
@@ -114,19 +114,22 @@ public class CacheNotifyServiceImpl implements CacheNotifyService {
@Override
public void notifyMaster(Command command) {
logger.info("send result, command:{}", command.toString());
+ try {
+ List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
+ if (CollectionUtils.isEmpty(serverList)) {
+ return;
+ }
- List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
- if (CollectionUtils.isEmpty(serverList)) {
- return;
- }
-
- for (Server server : serverList) {
- Host host = new Host(server.getHost(), server.getPort());
- NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
- if (nettyRemoteChannel == null) {
- continue;
+ for (Server server : serverList) {
+ Host host = new Host(server.getHost(), server.getPort());
+ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
+ if (nettyRemoteChannel == null) {
+ continue;
+ }
+ nettyRemoteChannel.writeAndFlush(command);
}
- nettyRemoteChannel.writeAndFlush(command);
+ } catch (Exception e) {
+ logger.error("notify master error", e);
}
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java
deleted file mode 100644
index 2b0dc8b..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-public interface BaseCacheProcessor {
- void cacheExpire(Class updateObjClass, String updateObjJson);
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java
deleted file mode 100644
index 4b438eb..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-public interface QueueCacheProcessor extends BaseCacheProcessor {
- public void expireAllUserCache();
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java
deleted file mode 100644
index c6b8000..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-
-public interface TenantCacheProcessor extends BaseCacheProcessor {
- void update(int tenantId);
-
- Tenant queryById(int tenantId);
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java
deleted file mode 100644
index 16a6ccd..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.enums.CacheType;
-import org.apache.dolphinscheduler.service.cache.processor.BaseCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.PostConstruct;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class CacheProcessorFactory {
-
- @Autowired
- private TenantCacheProcessor tenantCacheProcessor;
-
- @Autowired
- private UserCacheProcessor userCacheProcessor;
-
- @Autowired
- private QueueCacheProcessor queueCacheProcessor;
-
- Map<CacheType, BaseCacheProcessor> cacheProcessorMap = new ConcurrentHashMap<>();
-
- @PostConstruct
- private void init() {
- cacheProcessorMap.put(CacheType.TENANT, tenantCacheProcessor);
- cacheProcessorMap.put(CacheType.USER, userCacheProcessor);
- cacheProcessorMap.put(CacheType.QUEUE, queueCacheProcessor);
- }
-
- public BaseCacheProcessor getCacheProcessor(CacheType cacheType) {
- return cacheProcessorMap.get(cacheType);
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java
deleted file mode 100644
index 174d59f..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.stereotype.Component;
-
-@Component
-public class QueueCacheProcessorImpl implements QueueCacheProcessor {
-
- private final Logger logger = LoggerFactory.getLogger(getClass());
-
- @Override
- @CacheEvict(cacheNames = "user", allEntries = true)
- public void expireAllUserCache() {
- // just evict cache
- logger.debug("expire all user cache");
- }
-
- @Override
- public void cacheExpire(Class updateObjClass, String updateObjJson) {
- Queue updateQueue = (Queue) JSONUtils.parseObject(updateObjJson, updateObjClass);
- if (updateQueue == null) {
- return;
- }
- SpringApplicationContext.getBean(QueueCacheProcessor.class).expireAllUserCache();
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java
deleted file mode 100644
index 3ca8014..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheConfig;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Cacheable;
-import org.springframework.stereotype.Component;
-
-@Component
-@CacheConfig(cacheNames = "tenant")
-public class TenantCacheProcessorImpl implements TenantCacheProcessor {
-
- private final Logger logger = LoggerFactory.getLogger(getClass());
-
- @Autowired
- private TenantMapper tenantMapper;
-
- @Override
- @CacheEvict
- public void update(int tenantId) {
- // just evict cache
- }
-
- @Override
- @Cacheable(sync = true)
- public Tenant queryById(int tenantId) {
- logger.debug("tenant cache proxy, tenantId:{}", tenantId);
- return tenantMapper.queryById(tenantId);
- }
-
- @Override
- public void cacheExpire(Class updateObjClass, String updateObjJson) {
- Tenant updateTenant = (Tenant) JSONUtils.parseObject(updateObjJson, updateObjClass);
- if (updateTenant == null) {
- return;
- }
- SpringApplicationContext.getBean(TenantCacheProcessor.class).update(updateTenant.getId());
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java
deleted file mode 100644
index fb25fb5..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor.impl;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cache.annotation.CacheConfig;
-import org.springframework.cache.annotation.CacheEvict;
-import org.springframework.cache.annotation.Cacheable;
-import org.springframework.stereotype.Component;
-
-@Component
-@CacheConfig(cacheNames = "user")
-public class UserCacheProcessorImpl implements UserCacheProcessor {
-
- @Autowired
- private UserMapper userMapper;
-
- @Override
- @CacheEvict
- public void update(int userId) {
- // just evict cache
- }
-
- @Override
- @Cacheable(sync = true)
- public User selectById(int userId) {
- return userMapper.selectById(userId);
- }
-
- @Override
- public void cacheExpire(Class updateObjClass, String updateObjJson) {
- User user = (User) JSONUtils.parseObject(updateObjJson, updateObjClass);
- if (user == null) {
- return;
- }
- SpringApplicationContext.getBean(UserCacheProcessor.class).update(user.getId());
- }
-}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 9143154..9992002 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -100,15 +100,15 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
-import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@@ -155,7 +155,7 @@ public class ProcessService {
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
- private UserCacheProcessor userCacheProcessor;
+ private UserMapper userMapper;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@@ -194,7 +194,7 @@ public class ProcessService {
private ErrorCommandMapper errorCommandMapper;
@Autowired
- private TenantCacheProcessor tenantCacheProcessor;
+ private TenantMapper tenantMapper;
@Autowired
private ProjectMapper projectMapper;
@@ -233,8 +233,8 @@ public class ProcessService {
* @return process instance
*/
@Transactional
- public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
- ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
+ public ProcessInstance handleCommand(Logger logger, String host, Command command) {
+ ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command);
@@ -739,7 +739,7 @@ public class ProcessService {
public Tenant getTenantForProcess(int tenantId, int userId) {
Tenant tenant = null;
if (tenantId >= 0) {
- tenant = tenantCacheProcessor.queryById(tenantId);
+ tenant = tenantMapper.queryById(tenantId);
}
if (userId == 0) {
@@ -747,8 +747,8 @@ public class ProcessService {
}
if (tenant == null) {
- User user = userCacheProcessor.selectById(userId);
- tenant = tenantCacheProcessor.queryById(user.getTenantId());
+ User user = userMapper.selectById(userId);
+ tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
@@ -794,19 +794,12 @@ public class ProcessService {
* @param host host
* @return process instance
*/
- private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
+ private ProcessInstance constructProcessInstance(Command command, String host) {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
- String key = String.format("%d-%d", command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
- if (processDefinitionCacheMaps.containsKey(key)) {
- processDefinition = processDefinitionCacheMaps.get(key);
- } else {
- processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
- if (processDefinition != null) {
- processDefinitionCacheMaps.put(key, processDefinition);
- }
- }
+
+ processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null;
@@ -1964,11 +1957,11 @@ public class ProcessService {
return StringUtils.EMPTY;
}
int userId = resourceList.get(0).getUserId();
- User user = userCacheProcessor.selectById(userId);
+ User user = userMapper.selectById(userId);
if (Objects.isNull(user)) {
return StringUtils.EMPTY;
}
- Tenant tenant = tenantCacheProcessor.queryById(user.getTenantId());
+ Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (Objects.isNull(tenant)) {
return StringUtils.EMPTY;
}
@@ -2038,7 +2031,7 @@ public class ProcessService {
if (processInstance == null) {
return queue;
}
- User executor = userCacheProcessor.selectById(processInstance.getExecutorId());
+ User executor = userMapper.selectById(processInstance.getExecutorId());
if (executor != null) {
queue = executor.getQueue();
}
@@ -2149,7 +2142,7 @@ public class ProcessService {
* @return User
*/
public User getUserById(int userId) {
- return userCacheProcessor.selectById(userId);
+ return userMapper.selectById(userId);
}
/**
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
index 83f1f00..04d1f78 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
-import org.apache.dolphinscheduler.service.cache.service.impl.CacheNotifyServiceImpl;
+import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java
deleted file mode 100644
index ee61564..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Queue;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.QueueCacheProcessorImpl;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class QueueCacheProcessorTest {
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- @InjectMocks
- private QueueCacheProcessorImpl queueCacheProcessor;
-
- @Before
- public void before() {
- PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(QueueCacheProcessor.class)).thenReturn(queueCacheProcessor);
- }
-
- @Test
- public void testCacheExpire() {
- Queue queue = new Queue();
- queue.setId(100);
- queueCacheProcessor.cacheExpire(Queue.class, JSONUtils.toJsonString(queue));
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java
deleted file mode 100644
index 4ecc970..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.Tenant;
-import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class TenantCacheProcessorTest {
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- @InjectMocks
- private TenantCacheProcessorImpl tenantCacheProcessor;
-
- @Mock
- private TenantMapper tenantMapper;
-
- @Before
- public void before() {
- PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
- }
-
- @Test
- public void testQueryById() {
- Tenant tenant1 = new Tenant();
- tenant1.setId(100);
- tenant1.setDescription("test1");
-
- Mockito.when(tenantMapper.queryById(100)).thenReturn(tenant1);
- Assert.assertEquals(tenant1, tenantCacheProcessor.queryById(100));
- }
-
- @Test
- public void testCacheExpire() {
- Tenant tenant1 = new Tenant();
- tenant1.setId(100);
- tenant1.setDescription("test1");
- tenantCacheProcessor.cacheExpire(Tenant.class, JSONUtils.toJsonString(tenant1));
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java
deleted file mode 100644
index f786f43..0000000
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.service.cache.processor;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.dao.entity.User;
-import org.apache.dolphinscheduler.dao.mapper.UserMapper;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-/**
- * tenant cache proxy test
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({SpringApplicationContext.class})
-public class UserCacheProxyTest {
-
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
- @InjectMocks
- private UserCacheProcessorImpl userCacheProcessor;
-
- @Mock
- private UserMapper userMapper;
-
- @Before
- public void before() {
- PowerMockito.mockStatic(SpringApplicationContext.class);
- PowerMockito.when(SpringApplicationContext.getBean(UserCacheProcessor.class)).thenReturn(userCacheProcessor);
- }
-
- @Test
- public void testQueryById() {
- User user1 = new User();
- user1.setId(100);
-
- Mockito.when(userMapper.selectById(100)).thenReturn(user1);
- Assert.assertEquals(user1, userCacheProcessor.selectById(100));
- }
-
- @Test
- public void testCacheExpire() {
- User user = new User();
- user.setId(100);
- userCacheProcessor.cacheExpire(User.class, JSONUtils.toJsonString(user));
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 2117b86..510e3a0 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -64,7 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
-import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
+import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
@@ -116,7 +116,7 @@ public class ProcessServiceTest {
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
- private UserCacheProcessorImpl userCacheProcessor;
+ private UserMapper userMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock
@@ -134,8 +134,6 @@ public class ProcessServiceTest {
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
- private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
-
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
@@ -263,7 +261,7 @@ public class ProcessServiceTest {
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
- Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
+ Assert.assertNull(processService.handleCommand(logger, host, command));
int definitionVersion = 1;
long definitionCode = 123;
@@ -298,7 +296,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
- Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Command command2 = new Command();
command2.setId(2);
@@ -308,7 +306,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host, command2));
Command command3 = new Command();
command3.setId(3);
@@ -318,7 +316,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host, command3));
Command command4 = new Command();
command4.setId(4);
@@ -328,7 +326,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host, command4));
Command command5 = new Command();
command5.setId(5);
@@ -342,7 +340,7 @@ public class ProcessServiceTest {
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
- ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
+ ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
ProcessDefinition processDefinition1 = new ProcessDefinition();
@@ -367,7 +365,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
- Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
+ Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Command command6 = new Command();
command6.setId(6);
@@ -378,7 +376,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
- ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps);
+ ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@@ -397,7 +395,7 @@ public class ProcessServiceTest {
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
- ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps);
+ ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null);
ProcessDefinition processDefinition2 = new ProcessDefinition();
@@ -421,7 +419,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
- ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps);
+ ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
Assert.assertTrue(processInstance10 == null);
}
@@ -462,14 +460,14 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
// will throw exception when command id is 0 and delete fail
- processService.handleCommand(logger, host, command1, processDefinitionCacheMaps);
+ processService.handleCommand(logger, host, command1);
}
@Test
public void testGetUserById() {
User user = new User();
user.setId(123);
- Mockito.when(userCacheProcessor.selectById(123)).thenReturn(user);
+ Mockito.when(userMapper.selectById(123)).thenReturn(user);
Assert.assertEquals(user, processService.getUserById(123));
}