You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ji...@apache.org on 2022/09/17 03:28:11 UTC
[dolphinscheduler] branch dev updated: [Improvement-11773][api] Optimize the log printing of the api module according… (#11782)
This is an automated email from the ASF dual-hosted git repository.
jinyleechina pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1a7c6eabf3 [Improvement-11773][api] Optimize the log printing of the api module according… (#11782)
1a7c6eabf3 is described below
commit 1a7c6eabf3130c9d1836694e0474733bd5c3f9d5
Author: sgw <56...@users.noreply.github.com>
AuthorDate: Sat Sep 17 11:28:04 2022 +0800
[Improvement-11773][api] Optimize the log printing of the api module according… (#11782)
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Resolve conflict.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- fix ProcessDefinitionServiceImpl.java dependency
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Accept proposed change in ResourcePermissionCheckServiceImpl.java
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Update the UT case in TenantServiceTest.java, UsersServiceTest.java, TaskGroupServiceTest.java.
- Update log printing in TaskGroupServiceImpl.java.
- Fix NPE of log parameter in ExecutorServiceImpl.java when call python api.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix NPE of log in WorkerGroupServiceImpl.java.
- Fix log parameter missing bug in ResourcesServiceImpl.java.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix bugs and handle some vulnerability codes.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix bugs and handle some vulnerability codes.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix e2e User test.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix codeQL check & SonarCloud Code Analysis.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Resolve import conflict.
* [DS-11773][api] Optimize the log printing of the api module according to the log specification doc.
- Fix codeQL check & SonarCloud Code Analysis.
---
.../api/audit/AuditPublishService.java | 6 +-
.../api/controller/ExecutorController.java | 16 +-
.../api/controller/ProjectController.java | 5 +
.../api/controller/TenantController.java | 2 +-
.../dolphinscheduler/api/k8s/K8sManager.java | 2 +-
.../api/permission/PermissionCheck.java | 8 +-
.../ResourcePermissionCheckServiceImpl.java | 6 +-
.../api/security/impl/AbstractAuthenticator.java | 5 +-
.../api/service/impl/AccessTokenServiceImpl.java | 4 +-
.../api/service/impl/AlertGroupServiceImpl.java | 13 +-
.../impl/AlertPluginInstanceServiceImpl.java | 15 +-
.../api/service/impl/BaseServiceImpl.java | 6 +-
.../api/service/impl/ClusterServiceImpl.java | 24 +-
.../api/service/impl/DataAnalysisServiceImpl.java | 1 +
.../api/service/impl/DataSourceServiceImpl.java | 33 ++-
.../service/impl/DqExecuteResultServiceImpl.java | 5 +
.../api/service/impl/DqRuleServiceImpl.java | 2 +-
.../api/service/impl/EnvironmentServiceImpl.java | 58 +++--
.../api/service/impl/ExecutorServiceImpl.java | 93 ++++++--
.../api/service/impl/K8SNamespaceServiceImpl.java | 30 ++-
.../api/service/impl/LoggerServiceImpl.java | 6 +-
.../service/impl/ProcessDefinitionServiceImpl.java | 246 ++++++++++++++-------
.../service/impl/ProcessInstanceServiceImpl.java | 60 ++++-
.../impl/ProcessTaskRelationServiceImpl.java | 52 ++++-
.../api/service/impl/ProjectServiceImpl.java | 59 +++--
.../api/service/impl/QueueServiceImpl.java | 5 +-
.../api/service/impl/ResourcesServiceImpl.java | 179 +++++++++------
.../api/service/impl/SchedulerServiceImpl.java | 53 +++--
.../api/service/impl/SessionServiceImpl.java | 4 +-
.../service/impl/TaskDefinitionServiceImpl.java | 90 ++++++--
.../api/service/impl/TaskGroupServiceImpl.java | 36 ++-
.../api/service/impl/TaskInstanceServiceImpl.java | 13 +-
.../api/service/impl/TenantServiceImpl.java | 32 ++-
.../api/service/impl/UdfFuncServiceImpl.java | 19 +-
.../api/service/impl/UiPluginServiceImpl.java | 7 +
.../api/service/impl/UsersServiceImpl.java | 82 +++++--
.../service/impl/WorkFlowLineageServiceImpl.java | 7 +
.../api/service/impl/WorkerGroupServiceImpl.java | 13 +-
.../dolphinscheduler/api/utils/FileUtils.java | 3 +-
.../api/service/TaskGroupServiceTest.java | 1 +
.../api/service/TenantServiceTest.java | 2 +
.../api/service/UsersServiceTest.java | 5 +-
.../datasource/api/utils/DataSourceUtils.java | 2 +-
43 files changed, 978 insertions(+), 332 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
index 0c7af62fe9..dd9b0847a9 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java
@@ -63,7 +63,7 @@ public class AuditPublishService {
*/
public void publish(AuditMessage message) {
if (auditConfiguration.getEnabled() && !auditMessageQueue.offer(message)) {
- logger.error("add audit message failed {}", message);
+ logger.error("Publish audit message failed, message:{}", message);
}
}
@@ -79,11 +79,11 @@ public class AuditPublishService {
try {
subscriber.execute(message);
} catch (Exception e) {
- logger.error("consume audit message {} failed, error detail {}", message, e);
+ logger.error("Consume audit message failed, message:{}", message, e);
}
}
} catch (InterruptedException e) {
- logger.error("consume audit message failed {}.", message, e);
+ logger.error("Consume audit message failed, message:{}", message, e);
Thread.currentThread().interrupt();
break;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index bd17449940..177a8cabb5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -234,6 +234,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
+ logger.debug("Parameter timeout set to {} due to null.", Constants.MAX_TASK_TIMEOUT);
timeout = Constants.MAX_TASK_TIMEOUT;
}
@@ -243,6 +244,7 @@ public class ExecutorController extends BaseController {
}
if (complementDependentMode == null) {
+ logger.debug("Parameter complementDependentMode set to {} due to null.", ComplementDependentMode.OFF_MODE);
complementDependentMode = ComplementDependentMode.OFF_MODE;
}
@@ -261,8 +263,10 @@ public class ExecutorController extends BaseController {
complementDependentMode);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
+ logger.error("Process definition start failed, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
startFailedProcessDefinitionCodeList.add(String.valueOf(processDefinitionCode));
- }
+ } else
+ logger.info("Start process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
}
if (!startFailedProcessDefinitionCodeList.isEmpty()) {
@@ -294,7 +298,9 @@ public class ExecutorController extends BaseController {
public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processInstanceId") Integer processInstanceId,
- @RequestParam("executeType") ExecuteType executeType) {
+ @RequestParam("executeType") ExecuteType executeType
+ ) {
+ logger.info("Start to execute process instance, projectCode:{}, processInstanceId:{}.", projectCode, processInstanceId);
Map<String, Object> result = execService.execute(loginUser, projectCode, processInstanceId, executeType);
return returnDataList(result);
}
@@ -333,9 +339,10 @@ public class ExecutorController extends BaseController {
Map<String, Object> singleResult =
execService.execute(loginUser, projectCode, processInstanceId, executeType);
if (!Status.SUCCESS.equals(singleResult.get(Constants.STATUS))) {
+ logger.error("Start to execute process instance error, projectCode:{}, processInstanceId:{}.", projectCode, processInstanceId);
executeFailedIdList.add((String) singleResult.get(Constants.MSG));
- logger.error((String) singleResult.get(Constants.MSG));
- }
+ } else
+ logger.info("Start to execute process instance complete, projectCode:{}, processInstanceId:{}.", projectCode, processInstanceId);
} catch (Exception e) {
executeFailedIdList
.add(MessageFormat.format(Status.PROCESS_INSTANCE_ERROR.getMsg(), strProcessInstanceId));
@@ -428,6 +435,7 @@ public class ExecutorController extends BaseController {
startParamMap = JSONUtils.toMap(startParams);
}
+ logger.info("Start to execute stream task instance, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.", projectCode, code, version);
Map<String, Object> result = execService.execStreamTaskInstance(loginUser, projectCode, code, version,
warningGroupId, workerGroup, environmentCode, startParamMap, dryRun);
return returnDataList(result);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
index a4fb207723..e957a379f5 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
@@ -37,6 +37,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import springfox.documentation.annotations.ApiIgnore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
@@ -63,6 +65,8 @@ import io.swagger.annotations.ApiOperation;
@RequestMapping("projects")
public class ProjectController extends BaseController {
+ private static final Logger logger = LoggerFactory.getLogger(ProjectController.class);
+
@Autowired
private ProjectService projectService;
@@ -163,6 +167,7 @@ public class ProjectController extends BaseController {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
+ logger.warn("Pagination parameters check failed, pageNo:{}, pageSize:{}", pageNo, pageSize);
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java
index e45e08f95d..73a9ee55f1 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TenantController.java
@@ -199,7 +199,7 @@ public class TenantController extends BaseController {
*
* @param loginUser login user
* @param tenantCode tenant code
- * @return true if tenant code can user, otherwise return false
+ * @return true if tenant code can use, otherwise return false
*/
@ApiOperation(value = "verifyTenantCode", notes = "VERIFY_TENANT_CODE_NOTES")
@ApiImplicitParams({
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/k8s/K8sManager.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/k8s/K8sManager.java
index 52b7123cd8..88309cdb10 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/k8s/K8sManager.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/k8s/K8sManager.java
@@ -123,7 +123,7 @@ public class K8sManager {
Config config = Config.fromKubeconfig(configYaml);
return new DefaultKubernetesClient(config);
} catch (Exception e) {
- logger.error("fail to get k8s ApiClient", e);
+ logger.error("Fail to get k8s ApiClient", e);
throw new RemotingException("fail to get k8s ApiClient:" + e.getMessage());
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/PermissionCheck.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/PermissionCheck.java
index e6dd26e358..41d69e590e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/PermissionCheck.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/PermissionCheck.java
@@ -168,15 +168,17 @@ public class PermissionCheck<T> {
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user == null) {
- logger.error("user id {} doesn't exist", userId);
+ logger.error("User does not exist, userId:{}.", userId);
throw new ServiceException(String.format("user %s doesn't exist", userId));
}
if (user.getUserType() != UserType.ADMIN_USER) {
List<T> unauthorizedList = processService.listUnauthorized(userId, needChecks, authorizationType);
// if exist unauthorized resource
if (CollectionUtils.isNotEmpty(unauthorizedList)) {
- logger.error("user {} doesn't have permission of {}: {}", user.getUserName(), authorizationType.getDescp(), unauthorizedList);
- throw new ServiceException(String.format("user %s doesn't have permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
+ logger.error("User does not have {} permission for {}, userName:{}.",
+ authorizationType.getDescp(), unauthorizedList, user.getUserName());
+ throw new ServiceException(String.format("user %s doesn't have permission of %s %s",
+ user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
index e013603bd2..1ebd187369 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
@@ -107,6 +107,8 @@ public class ResourcePermissionCheckServiceImpl implements ResourcePermissionChe
Set<?> originResSet = new HashSet<>(Arrays.asList(needChecks));
Set<?> ownResSets = RESOURCE_LIST_MAP.get(authorizationType).listAuthorizedResource(userId, logger);
originResSet.removeAll(ownResSets);
+ if (CollectionUtils.isNotEmpty(originResSet))
+ logger.warn("User does not have resource permission on associated resources, userId:{}", userId);
return originResSet.isEmpty();
}
return true;
@@ -116,7 +118,7 @@ public class ResourcePermissionCheckServiceImpl implements ResourcePermissionChe
public boolean operationPermissionCheck(Object authorizationType, Object[] projectIds, Integer userId, String permissionKey, Logger logger) {
User user = processService.getUserById(userId);
if (user == null) {
- logger.error("user id {} doesn't exist", userId);
+ logger.error("User does not exist, userId:{}.", userId);
return false;
}
if (user.getUserType().equals(UserType.ADMIN_USER)) {
@@ -139,7 +141,7 @@ public class ResourcePermissionCheckServiceImpl implements ResourcePermissionChe
public Set<Object> userOwnedResourceIdsAcquisition(Object authorizationType, Integer userId, Logger logger) {
User user = processService.getUserById(userId);
if (user == null) {
- logger.error("user id {} doesn't exist", userId);
+ logger.error("User does not exist, userId:{}.", userId);
return Collections.emptySet();
}
return (Set<Object>) RESOURCE_LIST_MAP.get(authorizationType).listAuthorizedResource(
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/AbstractAuthenticator.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/AbstractAuthenticator.java
index 17fda8053e..8512aaf248 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/AbstractAuthenticator.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/AbstractAuthenticator.java
@@ -64,6 +64,7 @@ public abstract class AbstractAuthenticator implements Authenticator {
Result<Map<String, String>> result = new Result<>();
User user = login(userId, password, extra);
if (user == null) {
+ logger.error("Username or password entered incorrectly.");
result.setCode(Status.USER_NAME_PASSWD_ERROR.getCode());
result.setMsg(Status.USER_NAME_PASSWD_ERROR.getMsg());
return result;
@@ -71,6 +72,7 @@ public abstract class AbstractAuthenticator implements Authenticator {
// check user state
if (user.getState() == Flag.NO.ordinal()) {
+ logger.error("The current user is deactivated, userName:{}.", user.getUserName());
result.setCode(Status.USER_DISABLED.getCode());
result.setMsg(Status.USER_DISABLED.getMsg());
return result;
@@ -79,12 +81,13 @@ public abstract class AbstractAuthenticator implements Authenticator {
// create session
String sessionId = sessionService.createSession(user, extra);
if (sessionId == null) {
+ logger.error("Failed to create session, userName:{}.", user.getUserName());
result.setCode(Status.LOGIN_SESSION_FAILED.getCode());
result.setMsg(Status.LOGIN_SESSION_FAILED.getMsg());
return result;
}
- logger.info("sessionId : {}", sessionId);
+ logger.info("Session is created and sessionId is :{}.", sessionId);
Map<String, String> data = new HashMap<>();
data.put(Constants.SESSION_ID, sessionId);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
index 141f9cd672..42b56a0909 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
@@ -197,7 +197,7 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
AccessToken accessToken = accessTokenMapper.selectById(id);
if (accessToken == null) {
- logger.error("access token not exist, access token id {}", id);
+ logger.error("Access token does not exist, accessTokenId:{}.", id);
putMsg(result, Status.ACCESS_TOKEN_NOT_EXIST);
return result;
}
@@ -234,7 +234,7 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
// 2. check if token is existed
AccessToken accessToken = accessTokenMapper.selectById(id);
if (accessToken == null) {
- logger.error("access token not exist, access token id {}", id);
+ logger.error("Access token does not exist, accessTokenId:{}.", id);
putMsg(result, Status.ACCESS_TOKEN_NOT_EXIST);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertGroupServiceImpl.java
index e683f5f271..2b528acf46 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertGroupServiceImpl.java
@@ -169,6 +169,7 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
return result;
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -189,11 +190,13 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
result.put(Constants.DATA_LIST, alertGroup);
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.ALERT_GROUP, loginUser.getId(), Collections.singletonList(alertGroup.getId()), logger);
+ logger.info("Create alert group complete, groupName:{}", alertGroup.getGroupName());
} else {
+ logger.error("Create alert group error, groupName:{}", alertGroup.getGroupName());
putMsg(result, Status.CREATE_ALERT_GROUP_ERROR);
}
} catch (DuplicateKeyException ex) {
- logger.error("Create alert group error.", ex);
+ logger.error("Create alert group error, groupName:{}", alertGroup.getGroupName(), ex);
putMsg(result, Status.ALERT_GROUP_EXIST);
}
@@ -219,12 +222,14 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
return result;
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
AlertGroup alertGroup = alertGroupMapper.selectById(id);
if (alertGroup == null) {
+ logger.error("Alert group does not exist, id:{}.", id);
putMsg(result, Status.ALERT_GROUP_NOT_EXIST);
return result;
@@ -241,9 +246,10 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
alertGroup.setAlertInstanceIds(alertInstanceIds);
try {
alertGroupMapper.updateById(alertGroup);
+ logger.info("Update alert group complete, groupName:{}", alertGroup.getGroupName());
putMsg(result, Status.SUCCESS);
} catch (DuplicateKeyException ex) {
- logger.error("Update alert group error.", ex);
+ logger.error("Update alert group error, groupName:{}", alertGroup.getGroupName(), ex);
putMsg(result, Status.ALERT_GROUP_EXIST);
}
return result;
@@ -270,6 +276,7 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
// Not allow to delete the default alarm group ,because the module of service need to use it.
if (id == 1) {
+ logger.warn("Not allow to delete the default alarm group.");
putMsg(result, Status.NOT_ALLOW_TO_DELETE_DEFAULT_ALARM_GROUP);
return result;
}
@@ -277,11 +284,13 @@ public class AlertGroupServiceImpl extends BaseServiceImpl implements AlertGroup
//check exist
AlertGroup alertGroup = alertGroupMapper.selectById(id);
if (alertGroup == null) {
+ logger.error("Alert group does not exist, id:{}.", id);
putMsg(result, Status.ALERT_GROUP_NOT_EXIST);
return result;
}
alertGroupMapper.deleteById(id);
+ logger.info("Delete alert group complete, groupId:{}", id);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertPluginInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertPluginInstanceServiceImpl.java
index d74f2329f0..ee95249209 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertPluginInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AlertPluginInstanceServiceImpl.java
@@ -46,6 +46,8 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@@ -62,6 +64,8 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
@Lazy
public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements AlertPluginInstanceService {
+ private static final Logger logger = LoggerFactory.getLogger(AlertPluginInstanceServiceImpl.class);
+
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;
@@ -93,16 +97,19 @@ public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements A
return result;
}
if (alertPluginInstanceMapper.existInstanceName(alertPluginInstance.getInstanceName()) == Boolean.TRUE) {
+ logger.error("Plugin instance with the same name already exists, name:{}.", alertPluginInstance.getInstanceName());
putMsg(result, Status.PLUGIN_INSTANCE_ALREADY_EXIT);
return result;
}
int i = alertPluginInstanceMapper.insert(alertPluginInstance);
if (i > 0) {
+ logger.info("Create alert plugin instance complete, name:{}", alertPluginInstance.getInstanceName());
result.put(Constants.DATA_LIST, alertPluginInstance);
putMsg(result, Status.SUCCESS);
return result;
}
+ logger.error("Create alert plugin instance error, name:{}", alertPluginInstance.getInstanceName());
putMsg(result, Status.SAVE_ERROR);
return result;
}
@@ -130,9 +137,13 @@ public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements A
int i = alertPluginInstanceMapper.updateById(alertPluginInstance);
if (i > 0) {
+ logger.info("Update alert plugin instance complete, instanceId:{}, name:{}", alertPluginInstance.getId(),
+ alertPluginInstance.getInstanceName());
putMsg(result, Status.SUCCESS);
return result;
}
+ logger.error("Update alert plugin instance error, instanceId:{}, name:{}", alertPluginInstance.getId(),
+ alertPluginInstance.getInstanceName());
putMsg(result, Status.SAVE_ERROR);
return result;
}
@@ -150,6 +161,7 @@ public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements A
//check if there is an associated alert group
boolean hasAssociatedAlertGroup = checkHasAssociatedAlertGroup(String.valueOf(id));
if (hasAssociatedAlertGroup) {
+ logger.warn("Delete alert plugin failed because alert group is using it, pluginId:{}.", id);
putMsg(result, Status.DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED);
return result;
}
@@ -160,9 +172,10 @@ public class AlertPluginInstanceServiceImpl extends BaseServiceImpl implements A
int i = alertPluginInstanceMapper.deleteById(id);
if (i > 0) {
+ logger.info("Delete alert plugin instance complete, instanceId:{}", id);
putMsg(result, Status.SUCCESS);
}
-
+ logger.error("Delete alert plugin instance error, instanceId:{}", id);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
index 5c616a7c33..d8d24925b2 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java
@@ -53,8 +53,8 @@ public class BaseServiceImpl implements BaseService {
try{
resourcePermissionCheckService.postHandle(authorizationType, userId, ids, logger);
}catch (Exception e){
- logger.error("post handle error", e);
- throw new RuntimeException("resource association user error", e);
+ logger.error("Post handle error, userId:{}.", userId, e);
+ throw new RuntimeException("Resource association user error", e);
}
}
@@ -193,6 +193,7 @@ public class BaseServiceImpl implements BaseService {
if (!StringUtils.isEmpty(startDateStr)) {
start = DateUtils.stringToDate(startDateStr);
if (Objects.isNull(start)) {
+ logger.warn("Parameter startDateStr is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
@@ -203,6 +204,7 @@ public class BaseServiceImpl implements BaseService {
if (!StringUtils.isEmpty(endDateStr)) {
end = DateUtils.stringToDate(endDateStr);
if (Objects.isNull(end)) {
+ logger.warn("Parameter endDateStr is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ClusterServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ClusterServiceImpl.java
index 288fb8033f..c1881e2c24 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ClusterServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ClusterServiceImpl.java
@@ -84,6 +84,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
public Map<String, Object> createCluster(User loginUser, String name, String config, String desc) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can create cluster, current login user name:{}.", loginUser.getUserName());
return result;
}
@@ -94,6 +95,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
Cluster clusterExistByName = clusterMapper.queryByClusterName(name);
if (clusterExistByName != null) {
+ logger.warn("Cluster with the same name already exists, clusterName:{}.", clusterExistByName.getName());
putMsg(result, Status.CLUSTER_NAME_EXISTS, name);
return result;
}
@@ -110,7 +112,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
code = CodeGenerateUtils.getInstance().genCode();
cluster.setCode(code);
} catch (CodeGenerateException e) {
- logger.error("Cluster code get error, ", e);
+ logger.error("Generate cluster code error.", e);
}
if (code == 0L) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating cluster code");
@@ -118,9 +120,11 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
}
if (clusterMapper.insert(cluster) > 0) {
+ logger.info("Cluster create complete, clusterName:{}.", cluster.getName());
result.put(Constants.DATA_LIST, cluster.getCode());
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Cluster create error, clusterName:{}.", cluster.getName());
putMsg(result, Status.CREATE_CLUSTER_ERROR);
}
return result;
@@ -223,6 +227,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
Cluster cluster = clusterMapper.queryByClusterName(name);
if (cluster == null) {
+ logger.warn("Cluster does not exist, name:{}.", name);
putMsg(result, Status.QUERY_CLUSTER_BY_NAME_ERROR, name);
} else {
@@ -245,6 +250,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
public Map<String, Object> deleteClusterByCode(User loginUser, Long code) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can delete cluster, current login user name:{}.", loginUser.getUserName());
return result;
}
@@ -252,19 +258,23 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
.selectCount(new QueryWrapper<K8sNamespace>().lambda().eq(K8sNamespace::getClusterCode, code));
if (relatedNamespaceNumber > 0) {
+ logger.warn("Delete cluster failed because {} namespace(s) is(are) using it, clusterCode:{}.", relatedNamespaceNumber, code);
putMsg(result, Status.DELETE_CLUSTER_RELATED_NAMESPACE_EXISTS);
return result;
}
int delete = clusterMapper.deleteByCode(code);
if (delete > 0) {
+ logger.info("Delete cluster complete, clusterCode:{}.", code);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Delete cluster error, clusterCode:{}.", code);
putMsg(result, Status.DELETE_CLUSTER_ERROR);
}
return result;
}
+
/**
* update cluster
*
@@ -279,10 +289,12 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
public Map<String, Object> updateClusterByCode(User loginUser, Long code, String name, String config, String desc) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can update cluster, current login user name:{}.", loginUser.getUserName());
return result;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -294,12 +306,14 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
Cluster clusterExistByName = clusterMapper.queryByClusterName(name);
if (clusterExistByName != null && !clusterExistByName.getCode().equals(code)) {
+ logger.warn("Cluster with the same name already exists, name:{}.", clusterExistByName.getName());
putMsg(result, Status.CLUSTER_NAME_EXISTS, name);
return result;
}
Cluster clusterExist = clusterMapper.queryByClusterCode(code);
if (clusterExist == null) {
+ logger.error("Cluster does not exist, code:{}.", code);
putMsg(result, Status.CLUSTER_NOT_EXISTS, name);
return result;
}
@@ -309,6 +323,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
try {
k8sManager.getAndUpdateK8sClient(code, true);
} catch (RemotingException e) {
+ logger.error("Update K8s error.", e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, name);
return result;
}
@@ -320,7 +335,7 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
clusterExist.setDescription(desc);
clusterMapper.updateById(clusterExist);
// need not update relation
-
+ logger.info("Cluster update complete, clusterId:{}.", clusterExist.getId());
putMsg(result, Status.SUCCESS);
return result;
}
@@ -336,12 +351,14 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(clusterName)) {
+ logger.warn("Parameter cluster name is empty.");
putMsg(result, Status.CLUSTER_NAME_IS_NULL);
return result;
}
Cluster cluster = clusterMapper.queryByClusterName(clusterName);
if (cluster != null) {
+ logger.warn("Cluster with the same name already exists, name:{}.", cluster.getName());
putMsg(result, Status.CLUSTER_NAME_EXISTS, clusterName);
return result;
}
@@ -353,10 +370,12 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
public Map<String, Object> checkParams(String name, String config) {
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(name)) {
+ logger.warn("Parameter cluster name is empty.");
putMsg(result, Status.CLUSTER_NAME_IS_NULL);
return result;
}
if (StringUtils.isEmpty(config)) {
+ logger.warn("Parameter cluster config is empty.");
putMsg(result, Status.CLUSTER_CONFIG_IS_NULL);
return result;
}
@@ -365,3 +384,4 @@ public class ClusterServiceImpl extends BaseServiceImpl implements ClusterServic
}
}
+
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
index c7222c2a0d..276e1ddb42 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
@@ -168,6 +168,7 @@ public class DataAnalysisServiceImpl extends BaseServiceImpl implements DataAnal
start = DateUtils.stringToDate(startDate);
end = DateUtils.stringToDate(endDate);
if (Objects.isNull(start) || Objects.isNull(end)) {
+ logger.warn("Parameter startDate or endDate is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
index 4080d6e3c6..98d876b4e7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
@@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.DataSourceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@@ -111,10 +112,12 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
// check name can use or not
if (checkName(datasourceParam.getName())) {
+ logger.warn("Datasource with the same name already exists, name:{}.", datasourceParam.getName());
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
if(checkDescriptionLength(datasourceParam.getNote())){
+ logger.warn("Parameter description is too long, description:{}.", datasourceParam.getNote());
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -142,8 +145,9 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.DATASOURCE, loginUser.getId(), Collections.singletonList(dataSource.getId()), logger);
+ logger.info("Datasource create complete, dbType:{}, datasourceName:{}.", dataSource.getType().getDescp(), dataSource.getName());
} catch (DuplicateKeyException ex) {
- logger.error("Create datasource error.", ex);
+ logger.error("Datasource create error.", ex);
putMsg(result, Status.DATASOURCE_EXIST);
}
@@ -164,6 +168,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
// determine whether the data source exists
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
+ logger.error("Datasource does not exist, id:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -175,10 +180,12 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
//check name can use or not
if (!dataSource.getName().trim().equals(dataSource.getName()) && checkName(dataSource.getName())) {
+ logger.warn("Datasource with the same name already exists, name:{}.", dataSource.getName());
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
if(checkDescriptionLength(dataSourceParam.getNote())){
+ logger.warn("Parameter description is too long, description:{}.", dataSourceParam.getNote());
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -206,9 +213,10 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
dataSource.setUpdateTime(now);
try {
dataSourceMapper.updateById(dataSource);
+ logger.info("Update datasource complete, datasourceId:{}, datasourceName:{}.", dataSource.getId(), dataSource.getName());
putMsg(result, Status.SUCCESS);
} catch (DuplicateKeyException ex) {
- logger.error("Update datasource error.", ex);
+ logger.error("Update datasource error, datasourceId:{}, datasourceName:{}.", dataSource.getId(), dataSource.getName());
putMsg(result, Status.DATASOURCE_EXIST);
}
return result;
@@ -231,6 +239,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
+ logger.error("Datasource does not exist, id:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -342,6 +351,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
Result<Object> result = new Result<>();
List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
if (dataSourceList != null && !dataSourceList.isEmpty()) {
+ logger.warn("Datasource with the same name already exists, dataSourceName:{}.", dataSourceList.get(0).getName());
putMsg(result, Status.DATASOURCE_EXIST);
} else {
putMsg(result, Status.SUCCESS);
@@ -363,16 +373,18 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
Result<Object> result = new Result<>();
try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {
if (connection == null) {
+ logger.error("Connection test to {} datasource failed, connectionParam:{}.", type.getDescp(), connectionParam);
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
}
+ logger.info("Connection test to {} datasource success, connectionParam:{}", type.getDescp(), connectionParam);
putMsg(result, Status.SUCCESS);
return result;
} catch (Exception e) {
String message = Optional.of(e).map(Throwable::getCause)
.map(Throwable::getMessage)
.orElse(e.getMessage());
- logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, message);
+ logger.error("Datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, message);
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), message);
}
}
@@ -388,6 +400,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) {
Result<Object> result = new Result<>();
+ logger.error("Datasource does not exist, datasourceId:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -409,7 +422,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
//query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
if (dataSource == null) {
- logger.error("resource id {} not exist", datasourceId);
+ logger.warn("Datasource does not exist, datasourceId:{}.", datasourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -419,10 +432,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
dataSourceMapper.deleteById(datasourceId);
datasourceUserMapper.deleteByDatasourceId(datasourceId);
+ logger.info("Delete datasource complete, datasourceId:{}.", datasourceId);
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
- logger.error("delete datasource error", e);
- throw new RuntimeException("delete datasource error");
+ logger.error("Delete datasource complete, datasourceId:{}.", datasourceId, e);
+ throw new ServiceException(Status.DELETE_DATA_SOURCE_FAILURE);
}
return result;
}
@@ -514,7 +528,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
try {
schema = metaData.getConnection().getSchema();
} catch (SQLException e) {
- logger.error("cant not get the schema : {}", e.getMessage(), e);
+ logger.error("Cant not get the schema, datasourceId:{}.", datasourceId, e);
}
tables = metaData.getTables(
@@ -522,6 +536,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
getDbSchemaPattern(dataSource.getType(),schema,connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
+ logger.error("Get datasource tables error, datasourceId:{}.", datasourceId);
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
}
@@ -533,7 +548,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
} catch (Exception e) {
- logger.error(e.toString(), e);
+ logger.error("Get datasource tables error, datasourceId:{}.", datasourceId, e);
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
} finally {
@@ -588,7 +603,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
columnList.add(rs.getString(COLUMN_NAME));
}
} catch (Exception e) {
- logger.error(e.toString(), e);
+ logger.error("Get datasource table columns error, datasourceId:{}.", dataSource.getId(), e);
} finally {
closeResult(rs);
releaseConnection(connection);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
index 5b82239295..417103f9f3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqExecuteResultServiceImpl.java
@@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Date;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -42,6 +44,8 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@Service
public class DqExecuteResultServiceImpl extends BaseServiceImpl implements DqExecuteResultService {
+ private final Logger logger = LoggerFactory.getLogger(DqExecuteResultServiceImpl.class);
+
@Autowired
private DqExecuteResultMapper dqExecuteResultMapper;
@@ -72,6 +76,7 @@ public class DqExecuteResultServiceImpl extends BaseServiceImpl implements DqExe
end = DateUtils.stringToDate(endTime);
}
} catch (Exception e) {
+ logger.warn("Parameter startTime or endTime is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startTime,endTime");
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
index 931064a041..c1d57e7868 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DqRuleServiceImpl.java
@@ -240,7 +240,7 @@ public class DqRuleServiceImpl extends BaseServiceImpl implements DqRuleService
try {
result = mapper.writeValueAsString(params);
} catch (JsonProcessingException e) {
- logger.error("json parse error : {}", e.getMessage(), e);
+ logger.error("Json parse error.", e);
}
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
index 8e64113718..0f5efcfa3b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java
@@ -102,6 +102,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
return result;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -112,6 +113,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
Environment environment = environmentMapper.queryByEnvironmentName(name);
if (environment != null) {
+ logger.warn("Environment with the same name already exist, environmentName:{}.", environment.getName());
putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, name);
return result;
}
@@ -128,7 +130,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
code = CodeGenerateUtils.getInstance().genCode();
env.setCode(code);
} catch (CodeGenerateException e) {
- logger.error("Environment code get error, ", e);
+ logger.error("Generate environment code error.", e);
}
if (code == 0L) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating environment code");
@@ -149,15 +151,18 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
relation.setCreateTime(new Date());
relation.setUpdateTime(new Date());
relationMapper.insert(relation);
+ logger.info("Environment-WorkerGroup relation create complete, environmentName:{}, workerGroup:{}.",
+ env.getName(), relation.getWorkerGroup());
}
});
}
}
result.put(Constants.DATA_LIST, env.getCode());
putMsg(result, Status.SUCCESS);
- permissionPostHandle(AuthorizationType.ENVIRONMENT, loginUser.getId(),
- Collections.singletonList(env.getId()), logger);
+ permissionPostHandle(AuthorizationType.ENVIRONMENT, loginUser.getId(), Collections.singletonList(env.getId()), logger);
+ logger.info("Environment create complete, name:{}.", env.getName());
} else {
+ logger.error("Environment create error, name:{}.", env.getName());
putMsg(result, Status.CREATE_ENVIRONMENT_ERROR);
}
return result;
@@ -181,8 +186,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
environmentIPage = environmentMapper.queryEnvironmentListPaging(page, searchVal);
} else {
- Set<Integer> ids = resourcePermissionCheckService
- .userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT, loginUser.getId(), logger);
+ Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT, loginUser.getId(), logger);
if (ids.isEmpty()) {
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
@@ -195,13 +199,12 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
if (CollectionUtils.isNotEmpty(environmentIPage.getRecords())) {
Map<Long, List<String>> relationMap = relationMapper.selectList(null).stream()
- .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,
- Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup, Collectors.toList())));
+ .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup,Collectors.toList())));
List<EnvironmentDto> dtoList = environmentIPage.getRecords().stream().map(environment -> {
EnvironmentDto dto = new EnvironmentDto();
- BeanUtils.copyProperties(environment, dto);
- List<String> workerGroups = relationMap.getOrDefault(environment.getCode(), new ArrayList<String>());
+ BeanUtils.copyProperties(environment,dto);
+ List<String> workerGroups = relationMap.getOrDefault(environment.getCode(),new ArrayList<String>());
dto.setWorkerGroups(workerGroups);
return dto;
}).collect(Collectors.toList());
@@ -224,33 +227,31 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
*/
@Override
public Map<String, Object> queryAllEnvironmentList(User loginUser) {
- Map<String, Object> result = new HashMap<>();
- Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT,
- loginUser.getId(), logger);
+ Map<String,Object> result = new HashMap<>();
+ Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT, loginUser.getId(), logger);
if (ids.isEmpty()) {
result.put(Constants.DATA_LIST, Collections.emptyList());
- putMsg(result, Status.SUCCESS);
+ putMsg(result,Status.SUCCESS);
return result;
}
List<Environment> environmentList = environmentMapper.selectBatchIds(ids);
if (CollectionUtils.isNotEmpty(environmentList)) {
Map<Long, List<String>> relationMap = relationMapper.selectList(null).stream()
- .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,
- Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup, Collectors.toList())));
+ .collect(Collectors.groupingBy(EnvironmentWorkerGroupRelation::getEnvironmentCode,Collectors.mapping(EnvironmentWorkerGroupRelation::getWorkerGroup,Collectors.toList())));
List<EnvironmentDto> dtoList = environmentList.stream().map(environment -> {
EnvironmentDto dto = new EnvironmentDto();
- BeanUtils.copyProperties(environment, dto);
- List<String> workerGroups = relationMap.getOrDefault(environment.getCode(), new ArrayList<String>());
+ BeanUtils.copyProperties(environment,dto);
+ List<String> workerGroups = relationMap.getOrDefault(environment.getCode(),new ArrayList<String>());
dto.setWorkerGroups(workerGroups);
return dto;
}).collect(Collectors.toList());
- result.put(Constants.DATA_LIST, dtoList);
+ result.put(Constants.DATA_LIST,dtoList);
} else {
result.put(Constants.DATA_LIST, new ArrayList<>());
}
- putMsg(result, Status.SUCCESS);
+ putMsg(result,Status.SUCCESS);
return result;
}
@@ -273,7 +274,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
.collect(Collectors.toList());
EnvironmentDto dto = new EnvironmentDto();
- BeanUtils.copyProperties(env, dto);
+ BeanUtils.copyProperties(env,dto);
dto.setWorkerGroups(workerGroups);
result.put(Constants.DATA_LIST, dto);
putMsg(result, Status.SUCCESS);
@@ -299,7 +300,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
.collect(Collectors.toList());
EnvironmentDto dto = new EnvironmentDto();
- BeanUtils.copyProperties(env, dto);
+ BeanUtils.copyProperties(env,dto);
dto.setWorkerGroups(workerGroups);
result.put(Constants.DATA_LIST, dto);
putMsg(result, Status.SUCCESS);
@@ -326,6 +327,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
.selectCount(new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getEnvironmentCode, code));
if (relatedTaskNumber > 0) {
+ logger.warn("Delete environment failed because {} tasks is using it, environmentCode:{}.", relatedTaskNumber, code);
putMsg(result, Status.DELETE_ENVIRONMENT_RELATED_TASK_EXISTS);
return result;
}
@@ -335,8 +337,10 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
relationMapper.delete(new QueryWrapper<EnvironmentWorkerGroupRelation>()
.lambda()
.eq(EnvironmentWorkerGroupRelation::getEnvironmentCode, code));
+ logger.info("Environment and relations delete complete, environmentCode:{}.", code);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Environment delete error, environmentCode:{}.", code);
putMsg(result, Status.DELETE_ENVIRONMENT_ERROR);
}
return result;
@@ -367,12 +371,14 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
return checkResult;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
Environment environment = environmentMapper.queryByEnvironmentName(name);
if (environment != null && !environment.getCode().equals(code)) {
+ logger.warn("Environment with the same name already exist, name:{}.", environment.getName());
putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, name);
return result;
}
@@ -430,8 +436,10 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
relationMapper.insert(relation);
}
});
+ logger.info("Environment and relations update complete, environmentId:{}.", env.getId());
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Environment update error, environmentId:{}.", env.getId());
putMsg(result, Status.UPDATE_ENVIRONMENT_ERROR, name);
}
return result;
@@ -448,12 +456,14 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(environmentName)) {
+ logger.warn("parameter environment name is empty.");
putMsg(result, Status.ENVIRONMENT_NAME_IS_NULL);
return result;
}
Environment environment = environmentMapper.queryByEnvironmentName(environmentName);
if (environment != null) {
+ logger.warn("Environment with the same name already exist, name:{}.", environment.getName());
putMsg(result, Status.ENVIRONMENT_NAME_EXISTS, environmentName);
return result;
}
@@ -474,6 +484,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
if (Objects.nonNull(taskDefinitionList) && taskDefinitionList.size() != 0) {
Set<String> collect =
taskDefinitionList.stream().map(TaskDefinition::getName).collect(Collectors.toSet());
+ logger.warn("Environment {} and worker group {} is being used by task {}, so can not update.",
+ taskDefinitionList.get(0).getEnvironmentCode(), taskDefinitionList.get(0).getWorkerGroup(),
+ collect);
putMsg(result, Status.UPDATE_ENVIRONMENT_WORKER_GROUP_RELATION_ERROR, workerGroup, environmentName,
collect);
return result;
@@ -486,10 +499,12 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
public Map<String, Object> checkParams(String name, String config, String workerGroups) {
Map<String, Object> result = new HashMap<>();
if (StringUtils.isEmpty(name)) {
+ logger.warn("parameter environment name is empty.");
putMsg(result, Status.ENVIRONMENT_NAME_IS_NULL);
return result;
}
if (StringUtils.isEmpty(config)) {
+ logger.warn("parameter environment config is empty.");
putMsg(result, Status.ENVIRONMENT_CONFIG_IS_NULL);
return result;
}
@@ -497,6 +512,7 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
List<String> workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference<List<String>>() {
});
if (Objects.isNull(workerGroupList)) {
+ logger.warn("Parameter worker groups list is invalid.");
putMsg(result, Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index ce1d66683e..420f395382 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -186,6 +186,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
// timeout is invalid
if (timeout <= 0 || timeout > MAX_TASK_TIMEOUT) {
+ logger.warn("Parameter timeout is invalid, timeout:{}.", timeout);
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
@@ -199,8 +200,8 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
if (!checkTenantSuitable(processDefinition)) {
- logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
- processDefinition.getId(), processDefinition.getName());
+ logger.error("There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
+ processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
@@ -227,8 +228,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
+ logger.info("Create command complete, processDefinitionCode:{}, commandCount:{}.", processDefinition.getCode(), create);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Start process instance failed because create command error, processDefinitionCode:{}.", processDefinition.getCode());
putMsg(result, Status.START_PROCESS_INSTANCE_ERROR);
}
return result;
@@ -246,6 +249,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// no master
if (masterServers.isEmpty()) {
+ logger.error("Master does not exist.");
putMsg(result, Status.MASTER_NOT_EXISTS);
return false;
}
@@ -268,6 +272,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (cronMap.containsKey(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
String[] stringDates = cronMap.get(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST).split(COMMA);
if (stringDates.length > SCHEDULE_TIME_MAX_LENGTH) {
+ logger.warn("Parameter cornTime is bigger than {}.", SCHEDULE_TIME_MAX_LENGTH);
return false;
}
}
@@ -289,12 +294,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, Object> result = new HashMap<>();
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
// check process definition online
+ logger.warn("Process definition is not {}, processDefinitionCode:{}, version:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode, version);
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(processDefineCode), version);
} else if (!checkSubProcessDefinitionValid(processDefinition)) {
// check sub process definition online
+ logger.warn("Subprocess definition of process definition is not {}, processDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), processDefineCode);
putMsg(result, Status.SUB_PROCESS_DEFINE_NOT_RELEASE);
} else {
result.put(Constants.STATUS, Status.SUCCESS);
@@ -389,7 +397,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
if (!checkTenantSuitable(processDefinition)) {
- logger.error("there is not any valid tenant for the process definition: id:{},name:{}, ",
+ logger.error("There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
}
@@ -421,6 +429,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
case STOP:
if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
+ logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@@ -431,6 +440,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
break;
case PAUSE:
if (processInstance.getState() == WorkflowExecutionStatus.READY_PAUSE) {
+ logger.warn("Process instance status is already {}, processInstanceName:{}.", WorkflowExecutionStatus.READY_STOP.getDesc(), processInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_ALREADY_CHANGED, processInstance.getName(),
processInstance.getState());
} else {
@@ -439,7 +449,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
break;
default:
- logger.error("unknown execute type : {}", executeType);
+ logger.warn("Unknown execute type for process instance, processInstanceId:{}.", processInstance.getId());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "unknown execute type");
break;
@@ -454,6 +464,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process instance exist
ProcessInstance processInstance = processInstanceMapper.selectById(taskGroupQueue.getProcessId());
if (processInstance == null) {
+ logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", taskGroupQueue.getProjectCode(), taskGroupQueue.getProcessId());
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, taskGroupQueue.getProcessId());
return result;
}
@@ -546,6 +557,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether the process is normal
if (update > 0) {
+ logger.info("Process instance state is updated to {} in database, processInstanceName:{}.", executionStatus.getDesc(), processInstance.getName());
// directly send the process instance state change event to target master, not guarantee the event send
// success
WorkflowStateEventChangeCommand workflowStateEventChangeCommand = new WorkflowStateEventChangeCommand(
@@ -554,6 +566,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
stateEventCallbackService.sendResult(host, workflowStateEventChangeCommand.convert2Command());
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Process instance state update error, processInstanceName:{}.", processInstance.getName());
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
return result;
@@ -568,12 +581,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
private Map<String, Object> forceStart(ProcessInstance processInstance, TaskGroupQueue taskGroupQueue) {
Map<String, Object> result = new HashMap<>();
if (taskGroupQueue.getStatus() != TaskGroupQueueStatus.WAIT_QUEUE) {
+ logger.warn("Task group queue already starts, taskGroupQueueId:{}.", taskGroupQueue.getId());
putMsg(result, Status.TASK_GROUP_QUEUE_ALREADY_START);
return result;
}
taskGroupQueue.setForceStart(Flag.YES.getCode());
processService.updateTaskGroupQueue(taskGroupQueue);
+ logger.info("Sending force start command to master.");
processService.sendStartTask2Master(processInstance, taskGroupQueue.getTaskId(),
org.apache.dolphinscheduler.remote.command.CommandType.TASK_FORCE_STATE_EVENT_REQUEST);
putMsg(result, Status.SUCCESS);
@@ -610,15 +625,22 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setProcessInstanceId(instanceId);
if (!processService.verifyIsNeedCreateCommand(command)) {
+ logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+ processDefinitionCode, processVersion, instanceId);
putMsg(result, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, String.valueOf(processDefinitionCode));
return result;
}
+ logger.info("Creating command, commandInfo:{}.", command);
int create = processService.createCommand(command);
if (create > 0) {
+ logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
+ command.getCommandType().getDescp(), command.getProcessDefinitionCode(), processVersion, instanceId);
putMsg(result, Status.EXECUTE_PROCESS_INSTANCE_ERROR);
}
@@ -638,7 +660,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
- logger.error("process definition is not found");
+ logger.error("Process definition is not be found, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "processDefinitionCode");
return result;
}
@@ -653,9 +675,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* if there is no online process, exit directly
*/
if (processDefinitionTmp.getReleaseState() != ReleaseState.ONLINE) {
+ logger.warn("Subprocess definition {} of process definition {} is not {}.", processDefinitionTmp.getName(),
+ processDefinition.getName(), ReleaseState.ONLINE.getDescp());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinitionTmp.getName());
- logger.info("not release process definition id: {} , name : {}", processDefinitionTmp.getId(),
- processDefinitionTmp.getName());
return result;
}
}
@@ -734,12 +756,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// determine whether to complement
if (commandType == CommandType.COMPLEMENT_DATA) {
if (schedule == null || StringUtils.isEmpty(schedule)) {
+ logger.error("Create {} type command error because parameter schedule is invalid.", command.getCommandType().getDescp());
return 0;
}
if (!isValidateScheduleTime(schedule)) {
return 0;
}
try {
+ logger.info("Start to create {} command, processDefinitionCode:{}.", command.getCommandType().getDescp(), processDefineCode);
return createComplementCommandList(schedule, runMode, command, expectedParallelismNumber,
complementDependentMode);
} catch (CronParseException cronParseException) {
@@ -749,6 +773,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
} else {
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ logger.info("Creating command, commandInfo:{}.", command);
return processService.createCommand(command);
}
}
@@ -783,31 +808,42 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
switch (runMode) {
case RUN_MODE_SERIAL: {
+ logger.info("RunMode of {} command is serial run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (StringUtils.isNotEmpty(dateList)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, dateList);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
+ if (createCount > 0)
+ logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ else
+ logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
if (startDate != null && endDate != null) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startDate);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endDate);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ logger.info("Creating command, commandInfo:{}.", command);
createCount = processService.createCommand(command);
-
+ if (createCount > 0)
+ logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ else
+ logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
// dependent process definition
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info("process code: {} complement dependent in off mode or schedule's size is 0, skip "
- + "dependent complement data", command.getProcessDefinitionCode());
+ logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
} else {
+ logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount += createComplementDependentCommand(schedules, command);
}
}
break;
}
case RUN_MODE_PARALLEL: {
+ logger.info("RunMode of {} command is parallel run, processDefinitionCode:{}.", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (startDate != null && endDate != null) {
List<Schedule> schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode(
command.getProcessDefinitionCode());
@@ -820,7 +856,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
- logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+ logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
// Distribute the number of tasks equally to each command.
// The last command with insufficient quantity will be assigned to the remaining tasks.
@@ -845,14 +881,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE,
DateUtils.dateToString(listDate.get(endDateIndex)));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
-
+ logger.info("Creating command, commandInfo:{}.", command);
+ if (processService.createCommand(command) > 0)
+ logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ else
+ logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
if (schedules.isEmpty() || complementDependentMode == ComplementDependentMode.OFF_MODE) {
- logger.info(
- "process code: {} complement dependent in off mode or schedule's size is 0, skip "
- + "dependent complement data",
- command.getProcessDefinitionCode());
+ logger.info("Complement dependent mode is off mode or Scheduler is empty, so skip create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
} else {
+ logger.info("Complement dependent mode is all dependent and Scheduler is not empty, need create complement dependent command, processDefinitionCode:{}.", command.getProcessDefinitionCode());
dependentProcessDefinitionCreateCount +=
createComplementDependentCommand(schedules, command);
}
@@ -866,11 +903,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
if (expectedParallelismNumber != null && expectedParallelismNumber != 0) {
createCount = Math.min(createCount, expectedParallelismNumber);
}
- logger.info("In parallel mode, current expectedParallelismNumber:{}", createCount);
+ logger.info("Complement command run in parallel mode, current expectedParallelismNumber:{}.", createCount);
for (List<String> stringDate : Lists.partition(listDate, createCount)) {
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA, stringDate));
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- processService.createCommand(command);
+ logger.info("Creating command, commandInfo:{}.", command);
+ if (processService.createCommand(command) > 0)
+ logger.info("Create {} command complete, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
+ else
+ logger.error("Create {} command error, processDefinitionCode:{}", command.getCommandType().getDescp(), command.getProcessDefinitionCode());
}
}
}
@@ -879,7 +920,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
default:
break;
}
- logger.info("create complement command count: {}, create dependent complement command count: {}", createCount,
+ logger.info("Create complement command count:{}, Create dependent complement command count:{}", createCount,
dependentProcessDefinitionCreateCount);
return createCount;
}
@@ -894,7 +935,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
try {
dependentCommand = (Command) BeanUtils.cloneBean(command);
} catch (Exception e) {
- logger.error("copy dependent command error: ", e);
+ logger.error("Copy dependent command error.", e);
return dependentProcessDefinitionCreateCount;
}
@@ -912,6 +953,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Map<String, String> cmdParam = JSONUtils.toMap(dependentCommand.getCommandParam());
cmdParam.put(CMD_PARAM_START_NODES, String.valueOf(dependentProcessDefinition.getTaskDefinitionCode()));
dependentCommand.setCommandParam(JSONUtils.toJsonString(cmdParam));
+ logger.info("Creating complement dependent command, commandInfo:{}.", command);
dependentProcessDefinitionCreateCount += processService.createCommand(dependentCommand);
}
@@ -991,11 +1033,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return false;
}
if (start.isAfter(end)) {
- logger.error("complement data error, wrong date start:{} and end date:{} ", start, end);
+ logger.error("Complement data parameter error, start time should be before end time, startDate:{}, endDate:{}.", start, end);
return false;
}
} catch (Exception ex) {
- logger.warn("Parse schedule time error, startDate: {}, endDate: {}", startDate, endDate);
+ logger.warn("Parse schedule time error, startDate:{}, endDate:{}.", startDate, endDate);
return false;
}
}
@@ -1024,6 +1066,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId).orElse(null);
if (processInstance == null) {
+ logger.error("Process instance does not exist, processInstanceId:{}.", processInstanceId);
return null;
}
Host host = new Host(processInstance.getHost());
@@ -1032,6 +1075,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
org.apache.dolphinscheduler.remote.command.Command command =
stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
if (command == null) {
+ logger.error("Query executing process instance from master error, processInstanceId:{}.", processInstanceId);
return null;
}
WorkflowExecutingDataResponseCommand responseCommand =
@@ -1076,8 +1120,11 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
org.apache.dolphinscheduler.remote.command.Command response =
stateEventCallbackService.sendSync(host, taskExecuteStartCommand.convert2Command());
if (response != null) {
+ logger.info("Send task execute start command complete, response is {}.", response);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Start to execute stream task instance error, projectCode:{}, taskDefinitionCode:{}, taskVersion:{}.",
+ projectCode, taskDefinitionCode, taskDefinitionVersion);
putMsg(result, Status.START_TASK_INSTANCE_ERROR);
}
return result;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/K8SNamespaceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/K8SNamespaceServiceImpl.java
index 9ead1cd306..4cfcb14b2c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/K8SNamespaceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/K8SNamespaceServiceImpl.java
@@ -89,6 +89,7 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
public Result queryListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Result result = new Result();
if (!isAdmin(loginUser)) {
+ logger.warn("Only admin can query namespace list, current login user name:{}.", loginUser.getUserName());
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -122,36 +123,43 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
Integer limitsMemory) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can create K8s namespace, current login user name:{}.", loginUser.getUserName());
return result;
}
if (StringUtils.isEmpty(namespace)) {
+ logger.warn("Parameter namespace is empty.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.NAMESPACE);
return result;
}
if (clusterCode == null) {
+ logger.warn("Parameter clusterCode is null.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.CLUSTER);
return result;
}
if (limitsCpu != null && limitsCpu < 0.0) {
+ logger.warn("Parameter limitsCpu is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_CPU);
return result;
}
if (limitsMemory != null && limitsMemory < 0) {
+ logger.warn("Parameter limitsMemory is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_MEMORY);
return result;
}
if (checkNamespaceExistInDb(namespace, clusterCode)) {
+ logger.warn("K8S namespace already exists.");
putMsg(result, Status.K8S_NAMESPACE_EXIST, namespace, clusterCode);
return result;
}
Cluster cluster = clusterMapper.queryByClusterCode(clusterCode);
if (cluster == null) {
+ logger.error("Cluster does not exist, clusterCode:{}", clusterCode);
putMsg(result, Status.CLUSTER_NOT_EXISTS, namespace, clusterCode);
return result;
}
@@ -161,7 +169,7 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
code = CodeGenerateUtils.getInstance().genCode();
cluster.setCode(code);
} catch (CodeGenerateUtils.CodeGenerateException e) {
- logger.error("Cluster code get error, ", e);
+ logger.error("Generate cluster code error.", e);
}
if (code == 0L) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating cluster code");
@@ -188,13 +196,14 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
String yamlStr = genDefaultResourceYaml(k8sNamespaceObj);
k8sClientService.upsertNamespaceAndResourceToK8s(k8sNamespaceObj, yamlStr);
} catch (Exception e) {
- logger.error("namespace create to k8s error", e);
+ logger.error("Namespace create to k8s error", e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, e.getMessage());
return result;
}
}
k8sNamespaceMapper.insert(k8sNamespaceObj);
+ logger.info("K8s namespace create complete, namespace:{}.", k8sNamespaceObj.getNamespace());
putMsg(result, Status.SUCCESS);
return result;
@@ -214,21 +223,25 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
Integer limitsMemory) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can update K8s namespace, current login user name:{}.", loginUser.getUserName());
return result;
}
if (limitsCpu != null && limitsCpu < 0.0) {
+ logger.warn("Parameter limitsCpu is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_CPU);
return result;
}
if (limitsMemory != null && limitsMemory < 0) {
+ logger.warn("Parameter limitsMemory is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.LIMITS_MEMORY);
return result;
}
K8sNamespace k8sNamespaceObj = k8sNamespaceMapper.selectById(id);
if (k8sNamespaceObj == null) {
+ logger.error("K8s namespace does not exist, namespaceId:{}.", id);
putMsg(result, Status.K8S_NAMESPACE_NOT_EXIST, id);
return result;
}
@@ -243,14 +256,14 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
String yamlStr = genDefaultResourceYaml(k8sNamespaceObj);
k8sClientService.upsertNamespaceAndResourceToK8s(k8sNamespaceObj, yamlStr);
} catch (Exception e) {
- logger.error("namespace update to k8s error", e);
+ logger.error("Namespace update to k8s error", e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, e.getMessage());
return result;
}
}
// update to db
k8sNamespaceMapper.updateById(k8sNamespaceObj);
-
+ logger.info("K8s namespace update complete, namespace:{}.", k8sNamespaceObj.getNamespace());
putMsg(result, Status.SUCCESS);
return result;
}
@@ -266,16 +279,19 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
public Result<Object> verifyNamespaceK8s(String namespace, Long clusterCode) {
Result<Object> result = new Result<>();
if (StringUtils.isEmpty(namespace)) {
+ logger.warn("Parameter namespace is empty.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.NAMESPACE);
return result;
}
if (clusterCode == null) {
+ logger.warn("Parameter clusterCode is null.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.CLUSTER);
return result;
}
if (checkNamespaceExistInDb(namespace, clusterCode)) {
+ logger.warn("K8S namespace already exists.");
putMsg(result, Status.K8S_NAMESPACE_EXIST, namespace, clusterCode);
return result;
}
@@ -295,11 +311,13 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
public Map<String, Object> deleteNamespaceById(User loginUser, int id) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
+ logger.warn("Only admin can delete K8s namespace, current login user name:{}.", loginUser.getUserName());
return result;
}
K8sNamespace k8sNamespaceObj = k8sNamespaceMapper.selectById(id);
if (k8sNamespaceObj == null) {
+ logger.error("K8s namespace does not exist, namespaceId:{}.", id);
putMsg(result, Status.K8S_NAMESPACE_NOT_EXIST, id);
return result;
}
@@ -307,11 +325,13 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
try {
k8sClientService.deleteNamespaceToK8s(k8sNamespaceObj.getNamespace(), k8sNamespaceObj.getClusterCode());
} catch (RemotingException e) {
+ logger.error("Namespace delete in k8s error, namespaceId:{}.", id, e);
putMsg(result, Status.K8S_CLIENT_OPS_ERROR, id);
return result;
}
}
k8sNamespaceMapper.deleteById(id);
+ logger.info("K8s namespace delete complete, namespace:{}.", k8sNamespaceObj.getNamespace());
putMsg(result, Status.SUCCESS);
return result;
}
@@ -375,7 +395,7 @@ public class K8SNamespaceServiceImpl extends BaseServiceImpl implements K8sNames
if (loginUser.getId() != userId && isNotAdmin(loginUser, result)) {
return result;
}
- // query all namespace list,this auth does not like project
+ // query all namespace list, this auth does not like project
List<K8sNamespace> namespaceList = k8sNamespaceMapper.selectList(null);
List<K8sNamespace> resultList = new ArrayList<>();
Set<K8sNamespace> namespaceSet;
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index 518a03004a..e3f83f3c1f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -93,9 +93,11 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
+ logger.error("Task instance does not exist, taskInstanceId:{}.", taskInstId);
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
if (StringUtils.isBlank(taskInstance.getHost())) {
+ logger.error("Host of task instance is null, taskInstanceId:{}.", taskInstId);
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Result<ResponseTaskLog> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
@@ -197,8 +199,8 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
Host host = Host.of(taskInstance.getHost());
- logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
- host.getPort());
+ logger.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host:{}, logPath:{}, port:{}",
+ taskInstance.getId(), taskInstance.getName(), host.getIp(), taskInstance.getLogPath(), host.getPort());
StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index d2e957bd58..6a172d4936 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -117,18 +117,7 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -261,11 +250,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
+ logger.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
+ definition.getName(), definition.getCode());
throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name);
}
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
@@ -274,6 +266,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
+ logger.error("Tenant does not exist.");
throw new ServiceException(Status.TENANT_NOT_EXIST);
}
tenantId = tenant.getId();
@@ -298,18 +291,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ logger.error("Save task definition error.");
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
+ logger.error("Save process definition error, processCode:{}.", processDefinition.getCode());
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Save process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- }
+ } else
+ logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
+
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
putMsg(result, Status.SUCCESS);
@@ -367,19 +368,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
if (CollectionUtils.isNotEmpty(codes)) {
- logger.error("the task code is not exist");
- throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
+ String taskCodes = StringUtils.join(codes, Constants.COMMA);
+ logger.error("Task definitions do not exist, taskCodes:{}.", taskCodes);
+ throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCodes);
}
}
if (graphHasCycle(taskNodeList)) {
- logger.error("process DAG has cycle");
+ logger.error("Process DAG has cycle.");
throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE);
}
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPostTaskCode() == 0) {
- logger.error("the post_task_code or post_task_version can't be zero");
+ logger.error("The post_task_code or post_task_version of processTaskRelationLog can not be zero, " +
+ "processTaskRelationLogId:{}.", processTaskRelationLog.getId());
throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR);
}
}
@@ -519,6 +522,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
@@ -544,6 +548,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, name);
} else {
DagData dagData = processService.genDagData(processDefinition);
@@ -593,6 +598,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -603,6 +609,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
+ logger.error("Tenant does not exist.");
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
@@ -612,11 +619,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
// online can not permit edit
+ logger.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
@@ -624,6 +634,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
+ logger.warn("Process definition with the same name already exists, processDefinitionCode:{}.",
+ definition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
@@ -661,6 +673,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDepMsg.ifPresent(sb::append);
}
if (sb.length() != 0) {
+ logger.error("Task cannot be deleted because it is dependent");
throw new ServiceException(sb.toString());
}
}
@@ -679,6 +692,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ logger.error("Update task definitions error, projectCode:{}, processCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
@@ -704,26 +718,36 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
isChange = true;
}
if (isChange) {
+ logger.info("Process definition needs to be updated, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
processDefinition.setUpdateTime(new Date());
int insertVersion =
processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
+ logger.error("Update process definition error, processCode:{}.", processDefinition.getCode());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Update process definition complete, processCode:{}, processVersion:{}.", processDefinition.getCode(), insertVersion);
taskUsedInOtherTaskValid(processDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ logger.info("Update process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
+ logger.error("Update process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
} else {
+ logger.info("Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
}
@@ -739,8 +763,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return true if process definition name not exists, otherwise false
*/
@Override
- public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name,
- long processDefinitionCode) {
+ public Map<String, Object> verifyProcessDefinitionName(User loginUser, long projectCode, String name, long processDefinitionCode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@@ -758,6 +781,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.SUCCESS);
return result;
}
+ logger.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
+ processDefinition.getName(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name.trim());
return result;
}
@@ -772,6 +797,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private void processDefinitionUsedInOtherTaskValid(ProcessDefinition processDefinition) {
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ logger.warn("Process definition can not be deleted due to {}, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
throw new ServiceException(Status.PROCESS_DEFINE_STATE_ONLINE, processDefinition.getName());
}
@@ -779,6 +806,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessInstance> processInstances = processInstanceService
.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
+ logger.warn("Process definition can not be deleted because there are {} executing process instances, processDefinitionCode:{}",
+ processInstances.size(), processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_EXECUTING_FAIL, processInstances.size());
}
@@ -790,6 +819,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.map(task -> String.format(Constants.FORMAT_S_S_COLON, task.getProcessDefinitionName(),
task.getTaskName()))
.collect(Collectors.joining(Constants.COMMA));
+ logger.warn("Process definition can not be deleted due to being referenced by other tasks:{}, processDefinitionCode:{}",
+ taskDepDetail, processDefinition.getCode());
throw new ServiceException(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL, taskDepDetail);
}
}
@@ -814,12 +845,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
// Determine if the login user is the owner of the process definition
if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
+ logger.warn("User does not have permission for process definition, userId:{}, processDefinitionCode:{}.", loginUser.getId(), code);
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -832,23 +865,27 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (scheduleObj.getReleaseState() == ReleaseState.OFFLINE) {
int delete = scheduleMapper.deleteById(scheduleObj.getId());
if (delete == 0) {
+ logger.error("Delete schedule of process definition error, processDefinitionCode:{}, scheduleId:{}.", code, scheduleObj.getId());
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
}
if (scheduleObj.getReleaseState() == ReleaseState.ONLINE) {
+ logger.warn("Process definition can not be deleted due to schedule {}, processDefinitionCode:{}, scheduleId:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, scheduleObj.getId());
return result;
}
}
int delete = processDefinitionMapper.deleteById(processDefinition.getId());
if (delete == 0) {
+ logger.error("Delete process definition error, processDefinitionCode:{}.", code);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
- logger.warn("The process definition has not relation, it will be delete successfully");
+ logger.warn("The process definition has not relation, it will be delete successfully, processDefinitionCode:{}.", code);
}
deleteOtherRelation(project, result, processDefinition);
putMsg(result, Status.SUCCESS);
@@ -884,6 +921,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
@@ -892,27 +930,32 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
+ logger.warn("Process definition has no task relation, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
+ logger.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
- if (updateProcess > 0 && schedule != null) {
- logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}",
- projectCode, schedule.getId(), code);
- // set status
- schedule.setReleaseState(releaseState);
- int updateSchedule = scheduleMapper.updateById(schedule);
- if (updateSchedule == 0) {
- putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
- throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+ if (updateProcess > 0) {
+ logger.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
+ if (schedule != null) {
+ // set status
+ schedule.setReleaseState(releaseState);
+ int updateSchedule = scheduleMapper.updateById(schedule);
+ if (updateSchedule == 0) {
+ logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+ putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
+ throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
+ } else
+ logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, schedule.getId());
+ schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
- schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
break;
default:
@@ -931,6 +974,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes,
HttpServletResponse response) {
if (StringUtils.isEmpty(codes)) {
+ logger.warn("Process definition codes to be exported is empty.");
return;
}
Project project = projectMapper.queryByCode(projectCode);
@@ -944,6 +988,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
if (CollectionUtils.isEmpty(processDefinitionList)) {
+ logger.error("Process definitions to be exported do not exist, processDefinitionCodes:{}.", defineCodeSet);
return;
}
// check processDefinition exist in project
@@ -952,8 +997,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<DagDataSchedule> dagDataSchedules =
processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
+ logger.info("Start download process definition file, processDefinitionCodes:{}.", defineCodeSet);
downloadProcessDefinitionFile(response, dagDataSchedules);
- }
+ } else
+ logger.error("There is no exported process dag data.");
}
/**
@@ -970,20 +1017,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
buff.flush();
buff.close();
} catch (IOException e) {
- logger.warn("export process fail", e);
+ logger.warn("Export process definition fail", e);
} finally {
if (null != buff) {
try {
buff.close();
} catch (Exception e) {
- logger.warn("export process buffer not close", e);
+ logger.warn("Buffer does not close", e);
}
}
if (null != out) {
try {
out.close();
} catch (Exception e) {
- logger.warn("export process output stream not close", e);
+ logger.warn("Output stream does not close", e);
}
}
}
@@ -1026,6 +1073,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// check file content
if (CollectionUtils.isEmpty(dagDataScheduleList)) {
+ logger.warn("Process definition file content is empty.");
putMsg(result, Status.DATA_IS_NULL, "fileContent");
return result;
}
@@ -1095,7 +1143,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
long compressionRatio = totalSizeEntry / entry.getCompressedSize();
if (compressionRatio > THRESHOLD_RATIO) {
throw new IllegalStateException(
- "ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack");
+ "Ratio between compressed and uncompressed data is highly suspicious, looks like a Zip Bomb Attack.");
}
int commentIndex = line.indexOf("-- ");
if (commentIndex >= 0) {
@@ -1143,6 +1191,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
dataSource = queryDatasourceByNameAndUser(datasourceName, loginUser);
}
if (dataSource == null) {
+ logger.error("Datasource does not found, may be its name is illegal.");
putMsg(result, Status.DATASOURCE_NAME_ILLEGAL);
return result;
}
@@ -1167,7 +1216,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Import process definition error.", e);
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
return result;
}
@@ -1272,6 +1321,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
+ logger.error("Save process definition error because generate process definition code error, projectCode:{}.", projectCode, e);
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
return false;
}
@@ -1294,7 +1344,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskCodeMap.put(taskDefinitionLog.getCode(), code);
taskDefinitionLog.setCode(code);
} catch (CodeGenerateException e) {
- logger.error("Task code get error, ", e);
+ logger.error("Generate task definition code error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode(), e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return false;
}
@@ -1303,6 +1353,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogList);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) {
+ logger.error("Save task definition error, projectCode:{}, processDefinitionCode:{}", projectCode, processDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
@@ -1345,6 +1396,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(createDagResult, Status.SUCCESS);
} else {
result.putAll(createDagResult);
+ logger.error("Import process definition error, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
@@ -1357,10 +1409,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
schedule.setUpdateTime(now);
int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) {
+ logger.error("Import process definition error due to save schedule fail, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
}
}
+
+ logger.info("Import process definition complete, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
return true;
}
@@ -1369,14 +1424,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
private boolean checkImportanceParams(DagDataSchedule dagDataSchedule, Map<String, Object> result) {
if (dagDataSchedule.getProcessDefinition() == null) {
+ logger.warn("Process definition is null.");
putMsg(result, Status.DATA_IS_NULL, "ProcessDefinition");
return false;
}
if (CollectionUtils.isEmpty(dagDataSchedule.getTaskDefinitionList())) {
+ logger.warn("Task definition list is null.");
putMsg(result, Status.DATA_IS_NULL, "TaskDefinitionList");
return false;
}
if (CollectionUtils.isEmpty(dagDataSchedule.getProcessTaskRelationList())) {
+ logger.warn("Process task relation list is null.");
putMsg(result, Status.DATA_IS_NULL, "ProcessTaskRelationList");
return false;
}
@@ -1411,7 +1469,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Map<String, Object> result = new HashMap<>();
try {
if (processTaskRelationJson == null) {
- logger.error("process data is null");
+ logger.error("Process task relation data is null.");
putMsg(result, Status.DATA_IS_NOT_VALID, processTaskRelationJson);
return result;
}
@@ -1422,14 +1480,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<TaskNode> taskNodes = processService.transformTask(taskRelationList, taskDefinitionLogsList);
if (CollectionUtils.isEmpty(taskNodes)) {
- logger.error("process node info is empty");
+ logger.error("Task node data is empty.");
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
// check has cycle
if (graphHasCycle(taskNodes)) {
- logger.error("process DAG has cycle");
+ logger.error("Process DAG has cycle.");
putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
return result;
}
@@ -1442,7 +1500,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.dependence(taskNode.getDependence())
.switchResult(taskNode.getSwitchResult())
.build())) {
- logger.error("task node {} parameter invalid", taskNode.getName());
+ logger.error("Task node {} parameter invalid.", taskNode.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
return result;
}
@@ -1477,7 +1535,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
- logger.info("process define not exists");
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
@@ -1509,7 +1567,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
.collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
if (CollectionUtils.isEmpty(processDefinitionList)) {
- logger.info("process definition not exists");
+ logger.error("Process definitions do not exist, codes:{}.", defineCodeSet);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
@@ -1521,6 +1579,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
.filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
+ Set<Long> codesInProject = processDefinitionListInProject.stream()
+ .map(ProcessDefinition::getCode).collect(Collectors.toSet());
+ logger.error("Process definitions do not exist in project, projectCode:{}, processDefinitionsCodes:{}.",
+ processDefinitionListInProject.get(0).getProjectCode(), codesInProject);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
@@ -1627,7 +1689,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
@Override
public Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit) {
- Map<String, Object> result;
+ Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
result = projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_TREE_VIEW);
@@ -1636,15 +1698,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (null == processDefinition || projectCode != processDefinition.getProjectCode()) {
- logger.info("process define not exists");
+ logger.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
- // nodes that is running
+ // nodes that are running
Map<String, List<TreeViewDto>> runningNodeMap = new ConcurrentHashMap<>();
- // nodes that is waiting to run
+ // nodes that are waiting to run
Map<String, List<TreeViewDto>> waitingRunningNodeMap = new ConcurrentHashMap<>();
// List of process instances
@@ -1656,6 +1718,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
+ if (limit < 0) {
+ putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
+ return result;
+ }
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
}
@@ -1685,17 +1751,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
while (!ServerLifeCycleManager.isStopped()) {
Set<String> postNodeList;
- Set<Map.Entry<String, List<TreeViewDto>>> entries = runningNodeMap.entrySet();
- List<Integer> processInstanceIds = processInstanceList.stream()
- .limit(limit).map(ProcessInstance::getId).collect(Collectors.toList());
- List<Long> nodeCodes = entries.stream().map(e -> Long.parseLong(e.getKey())).collect(Collectors.toList());
- List<TaskInstance> taskInstances;
- if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) {
- taskInstances = Collections.emptyList();
- } else {
- taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds, nodeCodes);
- }
- for (Map.Entry<String, List<TreeViewDto>> en : entries) {
+ Iterator<Map.Entry<String, List<TreeViewDto>>> iter = runningNodeMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, List<TreeViewDto>> en = iter.next();
String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
@@ -1707,14 +1765,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance = processInstanceList.get(i);
- TaskInstance taskInstance = null;
- for (TaskInstance instance : taskInstances) {
- if (instance.getTaskCode() == Long.parseLong(nodeCode)
- && instance.getProcessInstanceId() == processInstance.getId()) {
- taskInstance = instance;
- break;
- }
- }
+ TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
+ Long.parseLong(nodeCode));
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null"));
} else {
@@ -1838,6 +1890,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (projectCode == targetProjectCode) {
+ logger.warn("Project code is same as target project code, projectCode:{}.", projectCode);
return result;
}
@@ -1859,6 +1912,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
if (StringUtils.isEmpty(processDefinitionCodes)) {
+ logger.error("Parameter processDefinitionCodes is empty, projectCode is {}.", projectCode);
putMsg(result, Status.PROCESS_DEFINITION_CODES_IS_EMPTY, processDefinitionCodes);
return result;
}
@@ -1899,10 +1953,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode);
String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
if (isCopy) {
+ logger.info("Copy process definition...");
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (COMPLEX_TASK_TYPES.contains(taskDefinitionLog.getTaskType())) {
+ logger.error("Task types {} do not support copy.", taskDefinitionLog.getTaskType());
putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
return;
}
@@ -1911,6 +1967,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskCodeMap.put(taskDefinitionLog.getCode(), taskCode);
taskDefinitionLog.setCode(taskCode);
} catch (CodeGenerateException e) {
+ logger.error("Generate task definition code error, projectCode:{}.", targetProjectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
@@ -1931,6 +1988,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
+ logger.error("Generate process definition code error, projectCode:{}.", targetProjectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
}
@@ -1959,6 +2017,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
scheduleObj.setUpdateTime(date);
int insertResult = scheduleMapper.insert(scheduleObj);
if (insertResult != 1) {
+ logger.error("Schedule create error, processDefinitionCode:{}.", processDefinition.getCode());
putMsg(result, Status.CREATE_SCHEDULE_ERROR);
throw new ServiceException(Status.CREATE_SCHEDULE_ERROR);
}
@@ -1967,14 +2026,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
otherParamsJson));
} catch (Exception e) {
+ logger.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e);
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
}
} else {
+ logger.info("Move process definition...");
try {
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
Lists.newArrayList(), otherParamsJson));
} catch (Exception e) {
+ logger.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e);
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
}
@@ -2030,6 +2092,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Switch process definition error because it does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, code);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
@@ -2037,15 +2100,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) {
+ logger.error("Switch process definition error because version does not exist, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR,
processDefinition.getCode(), version);
return result;
}
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
if (switchVersion <= 0) {
+ logger.error("Switch process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
}
+ logger.info("Switch process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -2062,14 +2128,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
if (!failedProcessList.isEmpty()) {
+ String failedProcess = String.join(",", failedProcessList);
if (isCopy) {
- putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
- String.join(",", failedProcessList));
+ logger.error("Copy process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+ srcProjectCode, targetProjectCode, failedProcess);
+ putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
} else {
- putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode,
- String.join(",", failedProcessList));
+ logger.error("Move process definition error, srcProjectCode:{}, targetProjectCode:{}, failedProcessList:{}.",
+ srcProjectCode, targetProjectCode, failedProcess);
+ putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR, srcProjectCode, targetProjectCode, failedProcess);
}
} else {
+ logger.info("Batch {} process definition complete, srcProjectCode:{}, targetProjectCode:{}.", isCopy?"copy":"move", srcProjectCode, targetProjectCode);
putMsg(result, Status.SUCCESS);
}
}
@@ -2133,19 +2203,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
} else {
if (processDefinition.getVersion() == version) {
+ logger.warn("Process definition can not be deleted due to version is being used, projectCode:{}, processDefinitionCode:{}, version:{}.",
+ projectCode, code, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
}
int deleteLog = processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
int deleteRelationLog = processTaskRelationLogMapper.deleteByCode(code, version);
if (deleteLog == 0 || deleteRelationLog == 0) {
+ logger.error("Delete process definition version error, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
deleteOtherRelation(project, result, processDefinition);
+ logger.info("Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version);
putMsg(result, Status.SUCCESS);
}
return result;
@@ -2183,12 +2258,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
+ logger.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
+ definition.getName(), definition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
@@ -2197,6 +2275,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
+ logger.error("Tenant does not exist.");
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
@@ -2206,6 +2285,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try {
processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
} catch (CodeGenerateException e) {
+ logger.error("Generate process definition code error, projectCode:{}.", projectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result;
}
@@ -2215,6 +2295,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setExecutionType(executionType);
result = createEmptyDagDefine(loginUser, processDefinition);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ logger.error("Create empty process definition error.");
return result;
}
@@ -2236,6 +2317,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Map<String, Object> result = new HashMap<>();
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
+ logger.error("Save process definition error, processDefinitionCode:{}.", processDefinition.getCode());
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
@@ -2255,12 +2337,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
+ logger.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", processDefinition.getCode());
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
if (!org.quartz.CronExpression.isValidExpression(scheduleObj.getCrontab())) {
- logger.error("{} verify failure", scheduleObj.getCrontab());
+ logger.error("CronExpression verify failure, cron:{}.", scheduleObj.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab());
return result;
}
@@ -2322,6 +2404,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -2329,6 +2412,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
+ logger.error("Tenant does not exist.");
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
@@ -2338,11 +2422,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
// online can not permit edit
+ logger.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
@@ -2350,6 +2437,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
+ logger.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
+ definition.getName(), definition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
@@ -2363,6 +2452,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy,
Lists.newArrayList(), otherParamsJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ logger.error("Update process definition basic info error.");
return result;
}
@@ -2446,11 +2536,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
if (scheduleObj == null) {
+ logger.error("Schedule cron does not exist, processDefinitionCode:{}.", code);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code);
return result;
}
@@ -2459,6 +2551,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
+ logger.warn("Process definition has no task relation, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
@@ -2470,12 +2563,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
if (updateProcess > 0) {
- logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}",
- projectCode, scheduleObj.getId(), code);
+ logger.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.",
+ projectCode, code, scheduleObj.getId());
// set status
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
+ logger.error("Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", projectCode, code, scheduleObj.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index cf98c5a3b4..7dcaee9365 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -73,6 +73,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -101,13 +104,18 @@ import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STAT
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
+import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
+
/**
* process instance service impl
*/
@Service
public class ProcessInstanceServiceImpl extends BaseServiceImpl implements ProcessInstanceService {
+ private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceServiceImpl.class);
+
public static final String TASK_TYPE = "taskType";
+
public static final String LOCAL_PARAMS_LIST = "localParamsList";
@Autowired
@@ -233,6 +241,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getProcessDefinitionVersion());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
processInstance.setLocations(processDefinition.getLocations());
@@ -348,6 +357,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processId));
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionId:{}.", projectCode, processId);
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
return result;
}
@@ -368,6 +378,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
private void addDependResultForTaskList(List<TaskInstance> taskInstanceList) throws IOException {
for (TaskInstance taskInstance : taskInstanceList) {
if (TASK_TYPE_DEPENDENT.equalsIgnoreCase(taskInstance.getTaskType())) {
+ logger.info("DEPENDENT type task instance need to set dependent result, taskCode:{}, taskInstanceId:{}", taskInstance.getTaskCode(), taskInstance.getId());
Result<ResponseTaskLog> logResult = loggerService.queryLog(
taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT);
if (logResult.getCode() == Status.SUCCESS.ordinal()) {
@@ -383,6 +394,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
public Map<String, DependResult> parseLogForDependentResult(String log) throws IOException {
Map<String, DependResult> resultMap = new HashMap<>();
if (StringUtils.isEmpty(log)) {
+ logger.warn("Log content is empty.");
return resultMap;
}
@@ -428,17 +440,20 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
TaskInstance taskInstance = processService.findTaskInstanceById(taskId);
if (taskInstance == null) {
+ logger.error("Task instance does not exist, projectCode:{}, taskInstanceId{}.", projectCode, taskId);
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
+ logger.error("Task definition does not exist, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskInstance.getTaskCode());
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
if (!taskInstance.isSubProcess()) {
+ logger.warn("Task instance is not {} type instance, projectCode:{}, taskInstanceId:{}.", TASK_TYPE_SUB_PROCESS, projectCode, taskId);
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
return result;
}
@@ -446,6 +461,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessInstance subWorkflowInstance = processService.findSubProcessInstance(
taskInstance.getProcessInstanceId(), taskInstance.getId());
if (subWorkflowInstance == null) {
+ logger.error("SubProcess instance does not exist, projectCode:{}, taskInstanceId:{}.", projectCode, taskInstance.getId());
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId);
return result;
}
@@ -485,17 +501,19 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
- //check process instance exists
+ // check process instance exists
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
- //check process instance exists in project
+ // check process instance exists in project
ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processInstance.getProcessDefinitionCode());
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance status
if (!processInstance.getState().isFinished()) {
+ logger.warn("Process Instance state is {} so can not update process instance, processInstanceId:{}.", processInstance.getState().getDesc(), processInstanceId);
putMsg(result, PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
@@ -513,6 +531,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
setProcessInstance(processInstance, tenantCode, scheduleTime, globalParams, timeout, timezoneId);
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
+ logger.warn("Parameter taskDefinitionJson is empty");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
@@ -522,12 +541,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
+ logger.error("Task parameters are invalid, taskDefinitionName:{}.", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, syncDefine);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ logger.error("Update task definition error, projectCode:{}, processInstanceId:{}", projectCode, processInstanceId);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
@@ -544,6 +565,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
+ logger.error("Tenant does not exist.");
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
@@ -554,24 +576,34 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, syncDefine, Boolean.FALSE);
if (insertVersion == 0) {
+ logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.", projectCode, processDefinition.getName());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Update process definition complete, projectCode:{}, processDefinitionName:{}.", projectCode, processDefinition.getName());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, syncDefine);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ logger.info("Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
+ projectCode, processDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
+ logger.info("Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
+ projectCode, processDefinition.getCode(), insertVersion);
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
processInstance.setProcessDefinitionVersion(insertVersion);
int update = processInstanceDao.updateProcessInstance(processInstance);
if (update == 0) {
+ logger.error("Update process instance version error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}",
+ projectCode, processDefinition.getCode(), insertVersion);
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR);
}
+ logger.info("Update process instance complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}",
+ projectCode, processDefinition.getCode(), insertVersion, processInstanceId);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -617,12 +649,14 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, subId));
if (subInstance.getIsSubProcess() == Flag.NO) {
+ logger.warn("Process instance is not sub process instance type, processInstanceId:{}, processInstanceName:{}.", subId, subInstance.getName());
putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName());
return result;
}
ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId);
if (parentWorkflowInstance == null) {
+ logger.error("Parent process instance does not exist, projectCode:{}, subProcessInstanceId:{}.", projectCode, subId);
putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST);
return result;
}
@@ -655,12 +689,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId));
// check process instance status
if (!processInstance.getState().isFinished()) {
+ logger.warn("Process Instance state is {} so can not delete process instance, processInstanceId:{}.",
+ processInstance.getState().getDesc(), processInstanceId);
throw new ServiceException(PROCESS_INSTANCE_STATE_OPERATION_ERROR, processInstance.getName(), processInstance.getState(), "delete");
}
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}, ProcessDefinitionCode:{}.",
+ projectCode, processInstance.getProcessDefinitionCode());
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
@@ -668,6 +706,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processService.removeTaskLogFile(processInstanceId);
} catch (Exception ignore) {
// ignore
+ logger.warn("Remove task log file exception, projectCode:{}, ProcessDefinitionCode{}, processInstanceId:{}.",
+ projectCode, processInstance.getProcessDefinitionCode(), processInstanceId);
}
// delete database cascade
@@ -678,8 +718,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processService.deleteWorkTaskInstanceByProcessInstanceId(processInstanceId);
if (delete > 0) {
+ logger.info("Delete process instance complete, projectCode:{}, ProcessDefinitionCode{}, processInstanceId:{}.",
+ projectCode, processInstance.getProcessDefinitionCode(), processInstanceId);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Delete process instance error, projectCode:{}, ProcessDefinitionCode{}, processInstanceId:{}.",
+ projectCode, processInstance.getProcessDefinitionCode(), processInstanceId);
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
}
@@ -701,12 +745,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
- throw new RuntimeException("workflow instance is null");
+ logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", projectCode, processInstanceId);
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+ return result;
}
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processInstance.getProcessDefinitionCode());
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
@@ -789,13 +836,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
if (processInstance == null) {
- throw new RuntimeException("workflow instance is null");
+ logger.error("Process instance does not exist, projectCode:{}, processInstanceId:{}.", projectCode, processInstanceId);
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
+ return result;
}
ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processInstance.getProcessDefinitionCode());
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
index 0e41c774e3..79bcc951e3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
@@ -53,6 +53,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -65,6 +67,8 @@ import com.google.common.collect.Lists;
@Service
public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements ProcessTaskRelationService {
+ private static final Logger logger = LoggerFactory.getLogger(ProcessTaskRelationServiceImpl.class);
+
@Autowired
private ProjectMapper projectMapper;
@@ -108,10 +112,12 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, processCode:{}.", processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (processDefinition.getProjectCode() != projectCode) {
+ logger.error("Process definition's project does not match project {}.", projectCode);
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result;
}
@@ -178,9 +184,12 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
ProcessDefinition processDefinition) {
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
+ logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinition.getCode());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Update process definition complete, new version is {}, projectCode:{}, processDefinitionCode:{}.",
+ insertVersion, processDefinition.getProjectCode(), processDefinition.getCode());
processDefinition.setVersion(insertVersion);
}
@@ -204,16 +213,20 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (taskCode == 0) {
+ logger.error("Delete task process relation error due to parameter taskCode is 0, projectCode:{}, processDefinitionCode:{}.",
+ projectCode, processDefinitionCode);
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (null == taskDefinition) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
@@ -221,6 +234,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
+ logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
@@ -234,7 +248,10 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
}
if (CollectionUtils.isNotEmpty(downstreamList)) {
- putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(downstreamList, ","));
+ String downstream = StringUtils.join(downstreamList, ",");
+ logger.warn("Relation can not be deleted because task has downstream tasks:[{}], projectCode:{}, processDefinitionCode:{}, taskDefinitionCode:{}.",
+ downstream, projectCode, processDefinitionCode, taskCode);
+ putMsg(result, Status.TASK_HAS_DOWNSTREAM, downstream);
return result;
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
@@ -244,9 +261,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
|| TASK_TYPE_SUB_PROCESS.equals(taskDefinition.getTaskType())) {
int deleteTaskDefinition = taskDefinitionMapper.deleteByCode(taskCode);
if (0 == deleteTaskDefinition) {
+ logger.error("Delete task definition error, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
- }
+ } else
+ logger.info("Delete {} type task definition complete, taskDefinitionCode:{}.", taskDefinition.getTaskType(), taskCode);
}
putMsg(result, Status.SUCCESS);
return result;
@@ -260,9 +279,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processDefinition.getCode(),
processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ logger.info("Update task relations complete, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
+ logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}, processDefinitionVersion:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
@@ -288,11 +311,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(preTaskCodes)) {
+ logger.warn("Parameter preTaskCodes is empty.");
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (CollectionUtils.isEmpty(upstreamList)) {
+ logger.error("Upstream tasks based on the task do not exist, theTaskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NULL, "taskCode");
return result;
}
@@ -300,12 +325,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
List<Long> preTaskCodeList = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream()
.map(Long::parseLong).collect(Collectors.toList());
if (preTaskCodeList.contains(0L)) {
+ logger.warn("Parameter preTaskCodes contain 0.");
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
List<Long> currentUpstreamList =
upstreamList.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toList());
if (currentUpstreamList.contains(0L)) {
+ logger.error("Upstream taskCodes based on the task contain, theTaskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NOT_VALID, "currentUpstreamList");
return result;
}
@@ -313,12 +340,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
tmpCurrent.removeAll(preTaskCodeList);
preTaskCodeList.removeAll(currentUpstreamList);
if (!preTaskCodeList.isEmpty()) {
- putMsg(result, Status.DATA_IS_NOT_VALID, StringUtils.join(preTaskCodeList, Constants.COMMA));
+ String invalidPreTaskCodes = StringUtils.join(preTaskCodeList, Constants.COMMA);
+ logger.error("Some upstream taskCodes are invalid, preTaskCodeList:{}.", invalidPreTaskCodes);
+ putMsg(result, Status.DATA_IS_NOT_VALID, invalidPreTaskCodes);
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(upstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", upstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(upstreamList.get(0).getProcessDefinitionCode()));
return result;
@@ -367,24 +397,28 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(postTaskCodes)) {
+ logger.warn("Parameter postTaskCodes is empty.");
putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
return result;
}
List<ProcessTaskRelation> downstreamList =
processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
if (CollectionUtils.isEmpty(downstreamList)) {
+ logger.error("Downstream tasks based on the task do not exist, theTaskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NULL, "taskCode");
return result;
}
List<Long> postTaskCodeList = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream()
.map(Long::parseLong).collect(Collectors.toList());
if (postTaskCodeList.contains(0L)) {
+ logger.warn("Parameter postTaskCodes contains 0.");
putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(downstreamList.get(0).getProcessDefinitionCode());
if (processDefinition == null) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", downstreamList.get(0).getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(downstreamList.get(0).getProcessDefinitionCode()));
return result;
@@ -496,6 +530,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
@@ -503,6 +538,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
+ logger.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinitionCode);
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result;
}
@@ -525,9 +561,13 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
if (processTaskRelation.getPreTaskCode() == preTaskCode) {
int delete = processTaskRelationMapper.deleteById(processTaskRelation.getId());
if (delete == 0) {
+ logger.error("Delete task relation edge error, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
+ processTaskRelation.getId(), preTaskCode, postTaskCode);
putMsg(result, Status.DELETE_EDGE_ERROR);
throw new ServiceException(Status.DELETE_EDGE_ERROR);
- }
+ } else
+ logger.info("Delete task relation edge complete, processTaskRelationId:{}, preTaskCode:{}, postTaskCode:{}",
+ processTaskRelation.getId(), preTaskCode, postTaskCode);
processTaskRelationList.remove(processTaskRelation);
}
}
@@ -537,6 +577,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L);
processTaskRelationList.add(processTaskRelation);
+ logger.info("Delete task relation through set invalid value for it: preTaskCode from {} to 0, processTaskRelationId:{}.",
+ preTaskCode, processTaskRelation.getId());
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
index 7fb867d4de..59d4329047 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
@@ -111,6 +111,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
Project project = projectMapper.queryByName(name);
if (project != null) {
+ logger.warn("Project {} already exists.", project.getName());
putMsg(result, Status.PROJECT_ALREADY_EXISTS, name);
return result;
}
@@ -129,19 +130,21 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
.updateTime(now)
.build();
} catch (CodeGenerateException e) {
+ logger.error("Generate process definition code error.", e);
putMsg(result, Status.CREATE_PROJECT_ERROR);
return result;
}
if (projectMapper.insert(project) > 0) {
+ logger.info("Project is created and id is :{}", project.getId());
result.setData(project);
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.PROJECTS, loginUser.getId(),
Collections.singletonList(project.getId()), logger);
} else {
+ logger.error("Project create error, projectName:{}.", project.getName());
putMsg(result, Status.CREATE_PROJECT_ERROR);
}
- logger.info("create project complete and id is :{}", project.getId());
return result;
}
@@ -153,6 +156,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
*/
public static void checkDesc(Result result, String desc) {
if (!StringUtils.isEmpty(desc) && desc.codePointCount(0, desc.length()) > 255) {
+ logger.warn("Parameter description check failed.");
result.setCode(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
result.setMsg(MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), "desc length"));
} else {
@@ -185,7 +189,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public Map<String, Object> queryByName(User loginUser, String projectName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
- boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project, result, PROJECT);
+ boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project, result,PROJECT);
if (!hasProjectAndPerm) {
return result;
}
@@ -209,10 +213,12 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
String permission) {
Map<String, Object> result = new HashMap<>();
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_EXIST);
} else if (!canOperatorPermissions(loginUser, new Object[]{project.getId()}, AuthorizationType.PROJECTS,
permission)) {
// check read permission
+ logger.error("User does not have {} permission to operate project, userName:{}, projectCode:{}.", permission, loginUser.getUserName(), projectCode);
putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(), projectCode);
} else {
putMsg(result, Status.SUCCESS);
@@ -236,9 +242,11 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result, String permission) {
boolean checkResult = false;
if (project == null) {
+ logger.error("Project does not exist.");
putMsg(result, Status.PROJECT_NOT_FOUND, "");
- } else if (!canOperatorPermissions(loginUser, new Object[]{project.getId()}, AuthorizationType.PROJECTS,
+ } else if (!canOperatorPermissions(loginUser, new Object[] {project.getId()}, AuthorizationType.PROJECTS,
permission)) {
+ logger.error("User does not have {} permission to operate project, userName:{}, projectCode:{}.", permission, loginUser.getUserName(), project.getCode());
putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(), project.getCode());
} else {
checkResult = true;
@@ -250,9 +258,11 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public boolean hasProjectAndPerm(User loginUser, Project project, Result result, String permission) {
boolean checkResult = false;
if (project == null) {
+ logger.error("Project does not exist.");
putMsg(result, Status.PROJECT_NOT_FOUND, "");
- } else if (!canOperatorPermissions(loginUser, new Object[]{project.getId()}, AuthorizationType.PROJECTS,
+ } else if (!canOperatorPermissions(loginUser, new Object[] {project.getId()}, AuthorizationType.PROJECTS,
permission)) {
+ logger.error("User does not have {} permission to operate project, userName:{}, projectCode:{}.", permission, loginUser.getUserName(), project.getCode());
putMsg(result, Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(), project.getName());
} else {
checkResult = true;
@@ -320,14 +330,17 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
processDefinitionMapper.queryAllDefinitionList(project.getCode());
if (!processDefinitionList.isEmpty()) {
+ logger.warn("Please delete the process definitions in project first! project code:{}.", projectCode);
putMsg(result, Status.DELETE_PROJECT_ERROR_DEFINES_NOT_NULL);
return result;
}
int delete = projectMapper.deleteById(project.getId());
if (delete > 0) {
+ logger.info("Project is deleted and id is :{}.", project.getId());
result.setData(Boolean.TRUE);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Project delete error, project code:{}, project name:{}.", projectCode, project.getName());
putMsg(result, Status.DELETE_PROJECT_ERROR);
}
return result;
@@ -340,9 +353,8 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
* @param project project
* @return check result
*/
- private Map<String, Object> getCheckResult(User loginUser, Project project, String perm) {
- Map<String, Object> checkResult =
- checkProjectAndAuth(loginUser, project, project == null ? 0L : project.getCode(), perm);
+ private Map<String, Object> getCheckResult(User loginUser, Project project,String perm) {
+ Map<String, Object> checkResult = checkProjectAndAuth(loginUser, project, project == null ? 0L : project.getCode(),perm);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
@@ -381,6 +393,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
}
User user = userMapper.queryByUserNameAccurately(userName);
if (user == null) {
+ logger.error("User does not exist.");
putMsg(result, Status.USER_NOT_EXIST, userName);
return result;
}
@@ -390,14 +403,17 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
project.setUserId(user.getId());
int update = projectMapper.updateById(project);
if (update > 0) {
+ logger.info("Project is updated and id is :{}", project.getId());
result.setData(project);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Project update error, projectCode:{}, projectName:{}.", project.getCode(), project.getName());
putMsg(result, Status.UPDATE_PROJECT_ERROR);
}
return result;
}
+
/**
* query unauthorized project
*
@@ -409,16 +425,13 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public Result queryUnauthorizedProject(User loginUser, Integer userId) {
Result result = new Result();
- Set<Integer> projectIds = resourcePermissionCheckService
- .userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), logger);
+ Set<Integer> projectIds = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), logger);
if (projectIds.isEmpty()) {
result.setData(Collections.emptyList());
putMsg(result, Status.SUCCESS);
return result;
}
- List<Project> projectList = projectMapper.listAuthorizedProjects(
- loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(),
- new ArrayList<>(projectIds));
+ List<Project> projectList = projectMapper.listAuthorizedProjects(loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : loginUser.getId(), new ArrayList<>(projectIds));
List<Project> resultList = new ArrayList<>();
Set<Project> projectSet;
@@ -439,7 +452,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
*
* @param projectSet project set
* @param authedProjectList authed project list
- * @return project list that authorization
+ * @return project list that unauthorized
*/
private List<Project> getUnauthorizedProjects(Set<Project> projectSet, List<Project> authedProjectList) {
List<Project> resultList;
@@ -447,7 +460,6 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
if (authedProjectList != null && !authedProjectList.isEmpty()) {
authedProjectSet = new HashSet<>(authedProjectList);
projectSet.removeAll(authedProjectSet);
-
}
resultList = new ArrayList<>(projectSet);
return resultList;
@@ -523,8 +535,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public Result queryProjectCreatedAndAuthorizedByUser(User loginUser) {
Result result = new Result();
- Set<Integer> projectIds = resourcePermissionCheckService
- .userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), logger);
+ Set<Integer> projectIds = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.PROJECTS, loginUser.getId(), logger);
if (projectIds.isEmpty()) {
result.setData(Collections.emptyList());
putMsg(result, Status.SUCCESS);
@@ -538,6 +549,18 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
return result;
}
+ /**
+ * check whether have read permission
+ *
+ * @param user user
+ * @param project project
+ * @return true if the user have permission to see the project, otherwise return false
+ */
+ private boolean checkReadPermission(User user, Project project) {
+ int permissionId = queryPermission(user, project);
+ return (permissionId & Constants.READ_PERMISSION) != 0;
+ }
+
/**
* query permission id
*
@@ -572,8 +595,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
@Override
public Result queryAllProjectList(User user) {
Result result = new Result();
- List<Project> projects =
- projectMapper.queryAllProject(user.getUserType() == UserType.ADMIN_USER ? 0 : user.getId());
+ List<Project> projects = projectMapper.queryAllProject(user.getUserType() == UserType.ADMIN_USER ? 0 : user.getId());
result.setData(projects);
putMsg(result, Status.SUCCESS);
@@ -593,6 +615,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
public void checkProjectAndAuth(Result result, User loginUser, Project project, long projectCode,
String permission) {
if (project == null) {
+ logger.error("Project does not exist, project code:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_EXIST);
} else if (!canOperatorPermissions(loginUser, new Object[]{project.getId()}, AuthorizationType.PROJECTS,
permission)) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
index 7eeb91396e..0bb8bd4284 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
@@ -180,6 +180,7 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
queueMapper.insert(queueObj);
result.setData(queueObj);
+ logger.info("Queue create complete, queueName:{}.", queueObj.getQueueName());
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.QUEUE, loginUser.getId(), Collections.singletonList(queueObj.getId()), logger);
return result;
@@ -209,7 +210,7 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
if (checkIfQueueIsInUsing(existsQueue.getQueueName(), updateQueue.getQueueName())) {
//update user related old queue
Integer relatedUserNums = userMapper.updateUserQueue(existsQueue.getQueueName(), updateQueue.getQueueName());
- logger.info("old queue have related {} user, exec update user success.", relatedUserNums);
+ logger.info("Old queue have related {} users, exec update user success.", relatedUserNums);
}
queueMapper.updateById(updateQueue);
@@ -284,11 +285,13 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
public Queue createQueueIfNotExists(String queue, String queueName) {
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
if (!Objects.isNull(existsQueue)) {
+ logger.info("Queue exists, so return it, queueName:{}.", queueName);
return existsQueue;
}
Queue queueObj = new Queue(queueName, queue);
createQueueValid(queueObj);
queueMapper.insert(queueObj);
+ logger.info("Queue create complete, queueName:{}.", queueObj.getQueueName());
return queueObj;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
index b863bebec7..7cdce0bb4e 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
@@ -157,11 +157,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
if (FileUtils.directoryTraversal(name)) {
+ logger.warn("Parameter name is invalid, name:{}.", RegexUtils.escapeNRT(name));
putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED);
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -173,7 +175,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
if (checkResourceExists(fullName, type.ordinal())) {
- logger.error("resource directory {} has exist, can't recreate", fullName);
+ logger.warn("Resource directory exists, can not create again, fullName:{}.", fullName);
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
@@ -195,11 +197,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
result.setData(resultMap);
} catch (DuplicateKeyException e) {
- logger.error("resource directory {} has exist, can't recreate", fullName);
+ logger.warn("Resource directory exists, can not create again, fullName:{}.", fullName);
putMsg(result, Status.RESOURCE_EXIST);
return result;
} catch (Exception e) {
- logger.error("resource already exists, can't recreate ", e);
+ logger.warn("Resource exists, can not create again, fullName:{}.", fullName, e);
throw new ServiceException("resource already exists, can't recreate");
}
// create directory in storage
@@ -252,6 +254,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -259,6 +262,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// make sure login user has tenant
String tenantCode = getTenantCode(loginUser.getId(), result);
if (StringUtils.isEmpty(tenantCode)) {
+ logger.error("Tenant of current login user does not specified, loginUserName:{}.", loginUser.getUserName());
return result;
}
@@ -270,13 +274,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// check resource name exists
String fullName = getFullName(currentDir, name);
if (checkResourceExists(fullName, type.ordinal())) {
- logger.error("resource {} has exist, can't recreate", RegexUtils.escapeNRT(name));
+ logger.warn("Resource exists, can not create again, fullName:{}.", RegexUtils.escapeNRT(name));
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
if (fullName.length() > Constants.RESOURCE_FULL_NAME_MAX_LENGTH) {
- logger.error("resource {}'s full name {}' is longer than the max length {}", RegexUtils.escapeNRT(name),
- fullName, Constants.RESOURCE_FULL_NAME_MAX_LENGTH);
+ logger.warn("Resource file's name is longer than max full name length, fullName:{}, fullNameSize:{}, maxFullNameSize:{}",
+ RegexUtils.escapeNRT(name), fullName.length(), Constants.RESOURCE_FULL_NAME_MAX_LENGTH);
putMsg(result, Status.RESOURCE_FULL_NAME_TOO_LONG_ERROR);
return result;
}
@@ -298,18 +302,19 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
result.setData(resultMap);
} catch (Exception e) {
- logger.error("resource already exists, can't recreate ", e);
+ logger.warn("Resource exists, can not create again, fullName:{}.", fullName, e);
throw new ServiceException("resource already exists, can't recreate");
}
// fail upload
if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", RegexUtils.escapeNRT(name),
- RegexUtils.escapeNRT(file.getOriginalFilename()));
+ logger.error("Upload resource file failed, resourceName:{}, fileName:{}.",
+ RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.STORE_OPERATE_CREATE_ERROR);
- throw new ServiceException(
- String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
- }
+ throw new ServiceException(String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
+ } else
+ logger.info("Upload resource file complete, resourceName:{}, fileName:{}.",
+ RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename()));
return result;
}
@@ -335,6 +340,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
parentResource.setSize(0L);
}
resourcesMapper.updateById(parentResource);
+ logger.info("Resource size update complete, resourceFullName:{}, newSize:{}.", parentResource.getFullName(), parentResource.getSize());
}
}
}
@@ -388,26 +394,31 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
if (!PropertyUtils.getResUploadStartupState()) {
+ logger.error("Storage does not start up, resource upload startup state: {}.", PropertyUtils.getResUploadStartupState());
putMsg(result, Status.STORAGE_NOT_STARTUP);
return result;
}
if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3)
&& !resource.getFileName().equals(name)) {
+ logger.warn("Directory in S3 storage can not be renamed.");
putMsg(result, Status.S3_CANNOT_RENAME);
return result;
}
if (file == null && name.equals(resource.getAlias()) && desc.equals(resource.getDescription())) {
+ logger.info("Resource does not need to be updated due to no change, resourceId:{}.", resourceId);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -419,7 +430,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String fullName = String.format(FORMAT_SS,
originFullName.substring(0, originFullName.lastIndexOf(FOLDER_SEPARATOR) + 1), name);
if (!originResourceName.equals(name) && checkResourceExists(fullName, type.ordinal())) {
- logger.error("resource {} already exists, can't recreate", name);
+ logger.warn("Resource exists, can not create again, fullName:{}.", RegexUtils.escapeNRT(name));
putMsg(result, Status.RESOURCE_EXIST);
return result;
}
@@ -432,6 +443,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// query tenant by user id
String tenantCode = getTenantCode(resource.getUserId(), result);
if (StringUtils.isEmpty(tenantCode)) {
+ logger.error("Tenant of current login user does not specified, loginUserName:{}.", loginUser.getUserName());
return result;
}
// verify whether the resource exists in storage
@@ -439,12 +451,14 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String originFileName = storageOperate.getFileName(resource.getType(), tenantCode, originFullName);
try {
if (!storageOperate.exists(tenantCode, originFileName)) {
- logger.error("{} not exist", originFileName);
+ logger.error("Resource file does not exist in {} storage, tenantCode:{}, resourceId:{}, originFileName:{}.",
+ resource.getType(), tenantCode, resourceId, originFileName);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
} catch (IOException e) {
- logger.error(e.getMessage(), e);
+ logger.error("Storage operation error, storageType:{}, tenantCode:{}, resourceId:{}, originFileName:{}.",
+ resource.getType(), tenantCode, resourceId, originFileName, e);
throw new ServiceException(Status.HDFS_OPERATION_ERROR);
}
@@ -553,8 +567,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (file != null) {
// fail upload
if (!upload(loginUser, fullName, file, type)) {
- logger.error("upload resource: {} file: {} failed.", name,
- RegexUtils.escapeNRT(file.getOriginalFilename()));
+ logger.error("Storage operation error, resourceId:{}, resourceName:{}, originFileName:{}.",
+ resourceId, name, RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.HDFS_OPERATION_ERROR);
throw new ServiceException(
String.format("upload resource: %s file: %s failed.", name, file.getOriginalFilename()));
@@ -563,7 +577,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
try {
storageOperate.delete(tenantCode, originFileName, false);
} catch (IOException e) {
- logger.error(e.getMessage(), e);
+ logger.error("Resource delete error, resourceFullName:{}.", originFullName, e);
throw new ServiceException(String.format("delete resource: %s failed.", originFullName));
}
}
@@ -576,10 +590,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String destHdfsFileName = storageOperate.getFileName(resource.getType(), tenantCode, fullName);
try {
- logger.info("start copy {} -> {}", originFileName, destHdfsFileName);
+ logger.info("Start copy file {} -> {}.", originFileName, destHdfsFileName);
storageOperate.copy(originFileName, destHdfsFileName, true, true);
} catch (Exception e) {
- logger.error(MessageFormat.format(" copy {0} -> {1} fail", originFileName, destHdfsFileName), e);
+ logger.error(MessageFormat.format("Copy file {0} -> {1} fail.", originFileName, destHdfsFileName), e);
putMsg(result, Status.HDFS_COPY_FAIL);
throw new ServiceException(Status.HDFS_COPY_FAIL);
}
@@ -592,13 +606,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
putMsg(result, Status.SUCCESS);
if (FileUtils.directoryTraversal(name)) {
- logger.error("file alias name {} verify failed", name);
+ logger.warn("Parameter file alias name verify failed, fileAliasName:{}.", RegexUtils.escapeNRT(name));
putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED);
return result;
}
if (file != null && FileUtils.directoryTraversal(Objects.requireNonNull(file.getOriginalFilename()))) {
- logger.error("file original name {} verify failed", file.getOriginalFilename());
+ logger.warn("File original name verify failed, fileOriginalName:{}.", RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED);
return result;
}
@@ -606,7 +620,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (file != null) {
// file is empty
if (file.isEmpty()) {
- logger.error("file is empty: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+ logger.warn("Parameter file is empty, fileOriginalName:{}.", RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.RESOURCE_FILE_IS_EMPTY);
return result;
}
@@ -618,7 +632,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// determine file suffix
if (!fileSuffix.equalsIgnoreCase(nameSuffix)) {
// rename file suffix and original suffix must be consistent
- logger.error("rename file suffix and original suffix must be consistent: {}",
+ logger.warn("Rename file suffix and original suffix must be consistent, fileOriginalName:{}.",
RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.RESOURCE_SUFFIX_FORBID_CHANGE);
return result;
@@ -626,12 +640,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// If resource type is UDF, only jar packages are allowed to be uploaded, and the suffix must be .jar
if (Constants.UDF.equals(type.name()) && !JAR.equalsIgnoreCase(fileSuffix)) {
- logger.error(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
+ logger.warn(Status.UDF_RESOURCE_SUFFIX_NOT_JAR.getMsg());
putMsg(result, Status.UDF_RESOURCE_SUFFIX_NOT_JAR);
return result;
}
if (file.getSize() > Constants.MAX_FILE_SIZE) {
- logger.error("file size is too large: {}", RegexUtils.escapeNRT(file.getOriginalFilename()));
+ logger.warn("Resource file size is larger than max file size, fileOriginalName:{}, fileSize:{}, maxFileSize:{}.",
+ RegexUtils.escapeNRT(file.getOriginalFilename()), file.getSize(), Constants.MAX_FILE_SIZE);
putMsg(result, Status.RESOURCE_SIZE_EXCEED_LIMIT);
return result;
}
@@ -657,6 +672,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (directoryId != -1) {
Resource directory = resourcesMapper.selectById(directoryId);
if (directory == null) {
+ logger.error("Resource does not exist, resourceId:{}.", directoryId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -820,6 +836,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Result<Object> resultCheck = new Result<>();
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
putMsg(resultCheck, Status.RESOURCE_NOT_EXIST);
return resultCheck;
}
@@ -844,10 +861,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String tenantCode = getTenantCode(resource.getUserId(), result);
if (StringUtils.isEmpty(tenantCode)) {
+ logger.error("Tenant of current login user does not specified, loginUserName:{}.", loginUser.getUserName());
return result;
}
- // get all resource id of process definitions those is released
+ // get all resource id of process definitions those are released
List<Map<String, Object>> list = processDefinitionMapper.listResources();
Map<Integer, Set<Long>> resourceProcessMap =
ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
@@ -856,9 +874,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
List<Integer> allChildren = listAllChildren(resource, true);
Integer[] needDeleteResourceIdArray = allChildren.toArray(new Integer[allChildren.size()]);
-
+
if (needDeleteResourceIdArray.length >= 2) {
- logger.error("can't be deleted,because There are files or folders in the current directory:{}", resource);
+ logger.warn("Resource can not be deleted because there are files or folders {} in the current directory.", resource.getFileName());
putMsg(result, Status.RESOURCE_HAS_FOLDER, resource.getFileName());
return result;
}
@@ -866,23 +884,25 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// if resource type is UDF,need check whether it is bound by UDF function
if (resource.getType() == (ResourceType.UDF)) {
List<UdfFunc> udfFuncs = udfFunctionMapper.listUdfByResourceId(needDeleteResourceIdArray);
+ List<Integer> udfFuncIds = udfFuncs.stream().map(UdfFunc::getId).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(udfFuncs)) {
- logger.error("can't be deleted,because it is bound by UDF functions:{}", udfFuncs);
+ logger.warn("Resource can not be deleted because it is bound by UDF functions, udfFuncIds:{}", udfFuncIds);
putMsg(result, Status.UDF_RESOURCE_IS_BOUND, udfFuncs.get(0).getFuncName());
return result;
}
}
if (resourceIdSet.contains(resource.getPid())) {
- logger.error("can't be deleted,because it is used of process definition");
+ logger.warn("Resource can not be deleted because it is used by process definition, resourceId:{}, processDefinitionCode:{}.",
+ resource.getId(), resource.getPid());
putMsg(result, Status.RESOURCE_IS_USED);
return result;
}
resourceIdSet.retainAll(allChildren);
if (CollectionUtils.isNotEmpty(resourceIdSet)) {
- logger.error("can't be deleted,because it is used of process definition");
for (Integer resId : resourceIdSet) {
- logger.error("resource id:{} is used of process definition {}", resId, resourceProcessMap.get(resId));
+ logger.warn("Resource can not be deleted because it is used by process definition, resourceId:{}, processDefinitionCode:{}.",
+ resId, resourceProcessMap.get(resId));
}
putMsg(result, Status.RESOURCE_IS_USED);
return result;
@@ -901,6 +921,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// delete file on storage
storageOperate.delete(tenantCode, storageFilename, true);
+ logger.info("Resource delete complete, tenantCode:{}, fileName:{}.", tenantCode, storageFilename);
putMsg(result, Status.SUCCESS);
return result;
@@ -927,8 +948,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
}
putMsg(result, Status.SUCCESS);
if (checkResourceExists(fullName, type.ordinal())) {
- logger.error("resource type:{} name:{} has exist, can't create again.", type,
- RegexUtils.escapeNRT(fullName));
+ logger.warn("Resource with same name exists so can not create again, resourceType:{}, resourceName:{}.",
+ type, RegexUtils.escapeNRT(fullName));
putMsg(result, Status.RESOURCE_EXIST);
} else {
// query tenant
@@ -938,14 +959,17 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
try {
String filename = storageOperate.getFileName(type, tenantCode, fullName);
if (storageOperate.exists(tenantCode, filename)) {
+ logger.warn("Resource file with same name exists so can not create again, tenantCode:{}, resourceName:{}.",
+ tenantCode, RegexUtils.escapeNRT(filename));
putMsg(result, Status.RESOURCE_FILE_EXIST, filename);
}
} catch (Exception e) {
- logger.error("verify resource failed and the reason is {}", e.getMessage());
+ logger.error("Verify resource name failed, resourceName:{}.", RegexUtils.escapeNRT(fullName), e);
putMsg(result, Status.STORE_OPERATE_CREATE_ERROR);
}
} else {
+ logger.error("Tenant does not exist or tenant of current login user does not specified, loginUserName:{}.", loginUser.getUserName());
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
}
}
@@ -965,6 +989,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
public Result<Object> queryResource(User loginUser, String fullName, Integer id, ResourceType type) {
Result<Object> result = new Result<>();
if (StringUtils.isBlank(fullName) && id == null) {
+ logger.warn("Parameter fullName and id is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
return result;
}
@@ -972,6 +997,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (StringUtils.isNotBlank(fullName)) {
List<Resource> resourceList = resourcesMapper.queryResource(fullName, type.ordinal());
if (CollectionUtils.isEmpty(resourceList)) {
+ logger.error("Resources do not exist, fullName:{}.",
+ RegexUtils.escapeNRT(fullName));
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -979,11 +1006,14 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
} else {
resource = resourcesMapper.selectById(id);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
- resource = resourcesMapper.selectById(resource.getPid());
+ int pid = resource.getPid();
+ resource = resourcesMapper.selectById(pid);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", pid);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1011,6 +1041,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Result<Object> result = new Result<>();
Resource resource = resourcesMapper.selectById(id);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1045,6 +1076,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// get resource by id
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1063,7 +1095,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (StringUtils.isNotEmpty(resourceViewSuffixes)) {
List<String> strList = Arrays.asList(resourceViewSuffixes.split(","));
if (!strList.contains(nameSuffix)) {
- logger.error("resource suffix {} not support view, resource id {}", nameSuffix, resourceId);
+ logger.warn("Resource suffix does not support view, resourceId:{}, suffix:{}.", resourceId, nameSuffix);
putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
return result;
}
@@ -1071,28 +1103,31 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String tenantCode = getTenantCode(resource.getUserId(), result);
if (StringUtils.isEmpty(tenantCode)) {
+ logger.error("Tenant of resource user does not specified, resourceUserIde:{}.", resource.getUserId());
return result;
}
// source path
String resourceFileName = storageOperate.getResourceFileName(tenantCode, resource.getFullName());
- logger.info("resource path is {}", resourceFileName);
+ logger.info("Resource file path is {}.", resourceFileName);
try {
if (storageOperate.exists(tenantCode, resourceFileName)) {
List<String> content = storageOperate.vimFile(tenantCode, resourceFileName, skipLineNum, limit);
+ logger.info("Vim file content in path {} success, tenantCode:{}, fileName:{}, skipLineNum:{}, limit:{}.",
+ resourceFileName, tenantCode, resourceFileName, skipLineNum, limit);
putMsg(result, Status.SUCCESS);
Map<String, Object> map = new HashMap<>();
map.put(ALIAS, resource.getAlias());
map.put(CONTENT, String.join("\n", content));
result.setData(map);
} else {
- logger.error("read file {} not exist in storage", resourceFileName);
+ logger.error("File does not exist in storage, fileName:{}.", resourceFileName);
putMsg(result, Status.RESOURCE_FILE_NOT_EXIST, resourceFileName);
}
} catch (Exception e) {
- logger.error("Resource {} read failed", resourceFileName, e);
+ logger.error("Resource file read error, fileName:{}.", resourceFileName, e);
putMsg(result, Status.HDFS_OPERATION_ERROR);
}
@@ -1129,10 +1164,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
if (FileUtils.directoryTraversal(fileName)) {
+ logger.warn("File name verify failed, fileName:{}.", RegexUtils.escapeNRT(fileName));
putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED);
return result;
}
if (checkDescriptionLength(desc)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -1143,7 +1180,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (StringUtils.isNotEmpty(resourceViewSuffixes)) {
List<String> strList = Arrays.asList(resourceViewSuffixes.split(","));
if (!strList.contains(nameSuffix)) {
- logger.error("resource suffix {} not support create", nameSuffix);
+ logger.warn("Resource suffix does not support view, suffix:{}.", nameSuffix);
putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
return result;
}
@@ -1238,8 +1275,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
- String msg = String.format("The suffix of file can not be empty : %s", fullName);
- logger.error(msg);
+ String msg = String.format("The suffix of file can not be empty, fullName:%s.", fullName);
+ logger.warn(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
@@ -1251,7 +1288,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
- String msg = String.format("Can not create or update resource : %s", fullName);
+ String msg = String.format("Create or update resource error, resourceName:%s.", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@@ -1269,7 +1306,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
return resultMap.get("id") == null ? -1 : (Integer) resultMap.get("id");
} else {
- String msg = String.format("Can not create dir %s", dirFullName);
+ String msg = String.format("Create dir error, dirFullName:%s.", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
@@ -1288,7 +1325,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
putMsg(result, Status.SUCCESS);
// if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) {
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+ logger.error("Storage does not start up, resource upload startup state: {}.", PropertyUtils.getResUploadStartupState());
putMsg(result, Status.STORAGE_NOT_STARTUP);
return result;
}
@@ -1309,10 +1346,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (pid != -1) {
Resource parentResource = resourcesMapper.selectById(pid);
if (parentResource == null) {
+ logger.error("Parent resource does not exist, parentResourceId:{}.", pid);
putMsg(result, Status.PARENT_RESOURCE_NOT_EXIST);
return result;
}
if (!canOperator(loginUser, parentResource.getUserId())) {
+ logger.warn("User does not have operation privilege, loginUserName:{}.", loginUser.getUserName());
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -1337,7 +1376,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
- logger.error("read file not exist, resource id {}", resourceId);
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1356,8 +1395,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (StringUtils.isNotEmpty(resourceViewSuffixes)) {
List<String> strList = Arrays.asList(resourceViewSuffixes.split(","));
if (!strList.contains(nameSuffix)) {
- logger.error("resource suffix {} not support updateProcessInstance, resource id {}", nameSuffix,
- resourceId);
+ logger.warn("Resource suffix does not support view, resourceId:{}, suffix:{}.", resourceId, nameSuffix);
putMsg(result, Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW);
return result;
}
@@ -1365,6 +1403,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String tenantCode = getTenantCode(resource.getUserId(), result);
if (StringUtils.isEmpty(tenantCode)) {
+ logger.error("Tenant of resource user does not specified, resourceUserIde:{}.", resource.getUserId());
return result;
}
long originFileSize = resource.getSize();
@@ -1377,7 +1416,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
throw new ServiceException(result.getMsg());
- }
+ } else
+ logger.info("Update resource content complete, resourceId:{}.", resourceId);
return result;
}
@@ -1397,7 +1437,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
if (!FileUtils.writeContent2File(content, localFilename)) {
// write file fail
- logger.error("file {} fail, content is {}", localFilename, RegexUtils.escapeNRT(content));
+ logger.error("Write file error, fileName:{}, content:{}.", localFilename, RegexUtils.escapeNRT(content));
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -1405,11 +1445,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
// get resource file path
storageFileName = storageOperate.getResourceFileName(tenantCode, resourceName);
String resourcePath = storageOperate.getResDir(tenantCode);
- logger.info("resource path is {}, resource dir is {}", storageFileName, resourcePath);
+ logger.info("Resource file name is {}, resource dir is {}.", storageFileName, resourcePath);
if (!storageOperate.exists(tenantCode, resourcePath)) {
// create if tenant dir not exists
storageOperate.createTenantDirIfNotExists(tenantCode);
+ logger.info("Create tenant dir because path {} does not exist, tenantCode:{}.", resourcePath, tenantCode);
}
if (storageOperate.exists(tenantCode, storageFileName)) {
storageOperate.delete(tenantCode, storageFileName, false);
@@ -1417,11 +1458,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
storageOperate.upload(tenantCode, localFilename, storageFileName, true, true);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Upload content to storage error, tenantCode:{}, destFileName:{}.", tenantCode, storageFileName, e);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
result.setMsg(String.format("copy %s to hdfs %s fail", localFilename, storageFileName));
return result;
}
+ logger.info("Upload content to storage complete, tenantCode:{}, destFileName:{}.", tenantCode, storageFileName);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -1437,13 +1479,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
public org.springframework.core.io.Resource downloadResource(User loginUser, int resourceId) throws IOException {
// if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) {
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+ logger.warn("Storage does not start up, resource upload startup state: {}.", PropertyUtils.getResUploadStartupState());
throw new ServiceException("hdfs not startup");
}
Resource resource = resourcesMapper.selectById(resourceId);
if (resource == null) {
- logger.error("download file not exist, resource id {}", resourceId);
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
return null;
}
@@ -1458,22 +1500,21 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
throw new ServiceException(Status.NO_CURRENT_OPERATING_PERMISSION.getMsg());
}
if (resource.isDirectory()) {
- logger.error("resource id {} is directory,can't download it", resourceId);
+ logger.warn("Resource is a directory so can not download it, resourceId:{}.", resourceId);
throw new ServiceException("can't download directory");
}
int userId = resource.getUserId();
User user = userMapper.selectById(userId);
if (user == null) {
- logger.error("user id {} not exists", userId);
- throw new ServiceException(String.format("resource owner id %d not exist", userId));
+ logger.error("User does not exits, userId:{}.", userId);
+ throw new ServiceException(String.format("resource owner id %d does not exist", userId));
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
- logger.error("tenant id {} not exists", user.getTenantId());
- throw new ServiceException(
- String.format("The tenant id %d of resource owner not exist", user.getTenantId()));
+ logger.error("Tenant does not exists, tenantId:{}.", user.getTenantId());
+ throw new ServiceException(String.format("The tenant id %d of resource owner does not exist", user.getTenantId()));
}
String tenantCode = tenant.getTenantCode();
@@ -1481,14 +1522,17 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String fileName = storageOperate.getFileName(resource.getType(), tenantCode, resource.getFullName());
String localFileName = FileUtils.getDownloadFilename(resource.getAlias());
- logger.info("resource path is {}, download local filename is {}", fileName, localFileName);
+ logger.info("Resource path is {}, download local filename is {}.", fileName, localFileName);
try {
storageOperate.download(tenantCode, fileName, localFileName, false, true);
- return org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
+ org.springframework.core.io.Resource file2Resource = org.apache.dolphinscheduler.api.utils.FileUtils.file2Resource(localFileName);
+ if (file2Resource != null) {
+ logger.info("Download resource complete, path:{}, localFileName:{}.", fileName, localFileName);
+ }
+ return file2Resource;
} catch (IOException e) {
- logger.error("download resource error, the path is {}, and local filename is {}, the error message is {}",
- fileName, localFileName, e.getMessage());
+ logger.error("Download resource error, path:{}, localFileName:{}.", fileName, localFileName, e);
throw new ServerException("download the resource file failed ,it may be related to your storage");
}
}
@@ -1534,7 +1578,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
User user = userMapper.queryByUserNameAccurately(userName);
Result<Object> resourceResponse = this.queryResource(user, fullName, null, ResourceType.FILE);
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
- String msg = String.format("Can not find valid resource by name %s", fullName);
+ String msg = String.format("Query resource by fullName failed, userName:%s, fullName:%s", userName, fullName);
+ logger.error(msg);
throw new IllegalArgumentException(msg);
}
return (Resource) resourceResponse.getData();
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 677d580e2d..633760121b 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -165,7 +165,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
+ logger.warn("The start time must not be the same as the end or time can not be null.");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
@@ -178,8 +178,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
scheduleObj.setStartTime(scheduleParam.getStartTime());
scheduleObj.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
- logger.error("{} verify failure", scheduleParam.getCrontab());
-
+ logger.error("Schedule crontab verify failure, crontab:{}.", scheduleParam.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
return result;
}
@@ -207,7 +206,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// return scheduler object with ID
result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId()));
putMsg(result, Status.SUCCESS);
-
+ logger.info("Schedule create complete, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.",
+ projectCode, processDefineCode, scheduleObj.getId());
result.put("scheduleId", scheduleObj.getId());
return result;
}
@@ -253,12 +253,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Schedule schedule = scheduleMapper.selectById(id);
if (schedule == null) {
+ logger.error("Schedule does not exist, scheduleId:{}.", id);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", schedule.getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(schedule.getProcessDefinitionCode()));
return result;
}
@@ -296,33 +298,36 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
+ logger.error("Schedule does not exist, scheduleId:{}.", id);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
- logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
- scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
+ logger.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
+ scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", scheduleObj.getProcessDefinitionCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
+ logger.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, processDefinition.getCode());
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
- logger.info("not release process definition id: {} , name : {}", processDefinition.getId(),
- processDefinition.getName());
+ logger.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
@@ -330,6 +335,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
List<Long> subProcessDefineCodes = new ArrayList<>();
processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
+ logger.info("Need to check sub process definition state before change schedule state, subProcessDefineCodes:{}.",
+ org.apache.commons.lang.StringUtils.join(subProcessDefineCodes, ","));
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
@@ -338,8 +345,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
- logger.info("not release process definition id: {} , name : {}",
- subProcessDefinition.getId(), subProcessDefinition.getName());
+ logger.warn("Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
return result;
@@ -353,6 +360,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {
+ logger.error("Master does not exist.");
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
@@ -379,6 +387,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
} catch (Exception e) {
+ logger.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(),
+ projectCode, scheduleObj.getId());
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
result.put(Constants.STATUS, status);
@@ -415,6 +425,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefineCode));
return result;
}
@@ -474,7 +485,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
public void setSchedule(int projectId, Schedule schedule) {
- logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
+ logger.info("Set schedule state {}, project id: {}, scheduleId: {}", schedule.getReleaseState().getDescp(), projectId, schedule.getId());
schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
}
@@ -487,7 +498,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
@Override
public void deleteSchedule(int projectId, int scheduleId) {
- logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
+ logger.info("Delete schedule of project, projectId:{}, scheduleId:{}", projectId, scheduleId);
schedulerApi.deleteScheduleTask(projectId, scheduleId);
}
@@ -531,18 +542,21 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Schedule schedule = scheduleMapper.selectById(scheduleId);
if (schedule == null) {
+ logger.error("Schedule does not exist, scheduleId:{}.", scheduleId);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
return result;
}
// Determine if the login user is the owner of the schedule
if (loginUser.getId() != schedule.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) {
+ logger.warn("User does not have permission to delete schedule, loginUserName:{}, scheduleId:{}.", loginUser.getUserName(), scheduleId);
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
// check schedule is already online
if (schedule.getReleaseState() == ReleaseState.ONLINE) {
+ logger.warn("Only {} state schedule can be deleted, scheduleId:{}.", ReleaseState.OFFLINE.getDescp(), scheduleId);
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
return result;
}
@@ -550,8 +564,10 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
int delete = scheduleMapper.deleteById(scheduleId);
if (delete > 0) {
+ logger.info("Schedule delete complete, scheduleId:{}.", scheduleId);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Schedule delete error, scheduleId:{}.", scheduleId);
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
return result;
@@ -580,7 +596,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
try {
cron = CronUtils.parse2Cron(scheduleParam.getCrontab());
} catch (CronParseException e) {
- logger.error(e.getMessage(), e);
+ logger.error("Parse cron to cron expression error, crontab:{}.", scheduleParam.getCrontab(), e);
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return result;
}
@@ -627,12 +643,14 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check schedule exists
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode);
if (schedule == null) {
+ logger.error("Schedule of process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, processDefinitionCode);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
@@ -647,7 +665,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup,
long environmentCode) {
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE,
- Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+ Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
+ logger.warn("Schedule can not be updated due to schedule is {}, scheduleId:{}.", ReleaseState.ONLINE.getDescp(), schedule.getId());
return;
}
@@ -657,11 +676,12 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
if (!StringUtils.isEmpty(scheduleExpression)) {
ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
if (scheduleParam == null) {
+ logger.warn("Parameter scheduleExpression is invalid, so parse cron error.");
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return;
}
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
- logger.warn("The start time must not be the same as the end");
+ logger.warn("The start time must not be the same as the end or time can not be null.");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return;
}
@@ -674,6 +694,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
+ logger.error("Schedule crontab verify failure, crontab:{}.", scheduleParam.getCrontab());
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return;
}
@@ -701,6 +722,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
processDefinitionMapper.updateById(processDefinition);
+ logger.info("Schedule update complete, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.",
+ processDefinition.getProjectCode(), processDefinition.getCode(), schedule.getId());
putMsg(result, Status.SUCCESS);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SessionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SessionServiceImpl.java
index cb8442c324..6734a21990 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SessionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SessionServiceImpl.java
@@ -75,7 +75,7 @@ public class SessionServiceImpl extends BaseServiceImpl implements SessionServic
}
String ip = BaseController.getClientIpAddress(request);
- logger.debug("get session: {}, ip: {}", sessionId, ip);
+ logger.debug("Get session: {}, ip: {}.", sessionId, ip);
return sessionMapper.selectById(sessionId);
}
@@ -156,7 +156,7 @@ public class SessionServiceImpl extends BaseServiceImpl implements SessionServic
//delete session
sessionMapper.deleteById(session.getId());
} catch (Exception e) {
- logger.warn("userId : {} , ip : {} , find more one session", loginUser.getId(), ip);
+ logger.warn("userId : {} , ip : {} , find more one session", loginUser.getId(), ip, e);
}
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 87ba643188..d7381f9256 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -130,7 +130,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
- logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
+ logger.warn("Parameter taskDefinitionJson is invalid.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
@@ -140,13 +140,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
- logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
+ logger.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs, Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ logger.error("Create task definition error, projectCode:{}.", projectCode);
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
@@ -183,16 +184,19 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(processDefinitionCode));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
+ logger.warn("Task definition can not be created due to process definition is {}, processDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, String.valueOf(processDefinitionCode));
return result;
}
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition == null) {
- logger.error("taskDefinitionJsonObj is not valid json");
+ logger.warn("Parameter taskDefinitionJsonObj is invalid json.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return result;
}
@@ -201,7 +205,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
.build())) {
- logger.error("task definition {} parameter invalid", taskDefinition.getName());
+ logger.error("Task definition {} parameters are invalid", taskDefinition.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
return result;
}
@@ -211,7 +215,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinition.setCode(taskCode);
} catch (CodeGenerateException e) {
- logger.error("Task code get error, ", e);
+ logger.error("Generate task definition code error.", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, taskDefinitionJsonObj);
return result;
}
@@ -224,7 +228,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
// upstreamTaskCodes - queryUpStreamTaskCodes
Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet());
if (!diffCode.isEmpty()) {
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(diffCode, Constants.COMMA));
+ String taskCodes = StringUtils.join(diffCode, Constants.COMMA);
+ logger.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", taskCodes);
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes);
return result;
}
for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) {
@@ -254,15 +260,19 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
+ logger.error("Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getCode(), processDefinition.getVersion());
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
- }
+ } else
+ logger.info("Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getCode(), processDefinition.getVersion());
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
+ logger.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskDefinition.getCode());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, taskDefinition);
return result;
@@ -287,6 +297,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), processCode, taskName);
if (taskDefinition == null) {
+ logger.error("Task definition does not exist, taskName:{}.", taskName);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
} else {
result.put(Constants.DATA_LIST, taskDefinition);
@@ -313,15 +324,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
if (taskCode == 0) {
+ logger.warn("Parameter taskCode 0 is invalid.");
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
+ logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
@@ -331,21 +345,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
- putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(postTaskCodes, ","));
+ String postTaskCodesStr = StringUtils.join(postTaskCodes, ",");
+ logger.warn("Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}",
+ taskCode, postTaskCodesStr);
+ putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr);
return result;
}
int delete = taskDefinitionMapper.deleteByCode(taskCode);
if (delete > 0) {
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
+ logger.info("Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList());
} else {
+ logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
putMsg(result, Status.SUCCESS);
}
} else {
+ logger.error("Task definition delete error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
@@ -356,19 +376,24 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
+ logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
+ logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinitionCode);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
+ logger.info("Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", processDefinition.getProjectCode(), processDefinitionCode, insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
+ logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinitionCode);
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
@@ -392,10 +417,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
+ logger.info("Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
}
+ logger.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
@@ -410,22 +437,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return null;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
// if stream, can update task definition without online check
if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) {
+ logger.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.", TaskExecuteType.STREAM, taskCode);
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return null;
}
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition.equals(taskDefinitionToUpdate)) {
+ logger.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode);
return null;
}
if (taskDefinitionToUpdate == null) {
- logger.error("taskDefinitionJson is not valid json");
+ logger.warn("Parameter taskDefinitionJson is invalid.");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return null;
}
@@ -434,12 +464,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.taskParams(taskDefinitionToUpdate.getTaskParams())
.dependence(taskDefinitionToUpdate.getDependence())
.build())) {
- logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
+ logger.warn("Task definition parameters are invalid, taskDefinitionName:{}.", taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
return null;
}
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
+ logger.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return null;
}
@@ -458,9 +489,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setCreateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
+ logger.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
- }
+ } else
+ logger.info("Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
+ projectCode, taskCode, taskDefinitionToUpdate.getVersion());
return taskDefinitionToUpdate;
}
@@ -502,7 +536,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
if (!upstreamTaskCodes.isEmpty()) {
- putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA));
+ String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA);
+ logger.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", notExistTaskCodes);
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes);
return result;
}
} else {
@@ -535,6 +571,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
}
+ logger.info("Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.",
+ projectCode, taskCode, upstreamTaskCodes);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
@@ -559,11 +597,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
if (processService.isTaskOnline(taskCode)) {
+ logger.warn("Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}.",
+ ReleaseState.ONLINE.getDescp(), taskCode);
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
return result;
}
@@ -575,13 +616,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (switchVersion > 0) {
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
+ logger.info("Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
} else {
+ logger.info("Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", version, taskCode);
putMsg(result, Status.SUCCESS);
}
} else {
+ logger.error("Task definition version switch error, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
}
return result;
@@ -625,16 +669,23 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
} else {
if (taskDefinition.getVersion() == version) {
+ logger.warn("Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.",
+ projectCode, taskCode, version);
putMsg(result, Status.MAIN_TABLE_USING_VERSION);
return result;
}
int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version);
if (delete > 0) {
+ logger.info("Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.",
+ projectCode, taskCode, version);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Task definition version delete error, projectCode:{}, taskDefinitionCode:{}, version:{}.",
+ projectCode, taskCode, version);
putMsg(result, Status.DELETE_TASK_DEFINITION_VERSION_ERROR);
}
}
@@ -652,6 +703,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode));
} else {
result.put(Constants.DATA_LIST, taskDefinition);
@@ -717,7 +769,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
public Map<String, Object> genTaskCodeList(Integer genNum) {
Map<String, Object> result = new HashMap<>();
if (genNum == null || genNum < 1 || genNum > 100) {
- logger.error("the genNum must be great than 1 and less than 100");
+ logger.warn("Parameter genNum must be great than 1 and less than 100.");
putMsg(result, Status.DATA_IS_NOT_VALID, genNum);
return result;
}
@@ -727,7 +779,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskCodes.add(CodeGenerateUtils.getInstance().genCode());
}
} catch (CodeGenerateException e) {
- logger.error("Task code get error, ", e);
+ logger.error("Generate task definition code error.", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
}
putMsg(result, Status.SUCCESS);
@@ -766,6 +818,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion());
if (taskDefinitionLog == null) {
+ logger.error("Task definition does not exist, taskDefinitionCode:{}.", code);
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
@@ -782,7 +835,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
try {
permissionCheck.checkPermission();
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("Resources permission check error, resourceIds:{}.", resourceIds, e);
putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION);
return result;
}
@@ -791,15 +844,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionLog.setFlag(Flag.YES);
break;
default:
+ logger.warn("Parameter releaseState is invalid.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
int update = taskDefinitionMapper.updateById(taskDefinition);
int updateLog = taskDefinitionLogMapper.updateById(taskDefinitionLog);
if ((update == 0 && updateLog == 1) || (update == 1 && updateLog == 0)) {
+ logger.error("Update taskDefinition state or taskDefinitionLog state error, taskDefinitionCode:{}.", code);
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
+ logger.error("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", code);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
index 7040706290..d954b94413 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
@@ -91,19 +91,23 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
if (name == null) {
+ logger.warn("Parameter name can ot be null.");
putMsg(result, Status.NAME_NULL);
return result;
}
if (groupSize <= 0) {
+ logger.warn("Parameter task group size is must bigger than 1.");
putMsg(result, Status.TASK_GROUP_SIZE_ERROR);
return result;
}
TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name);
if (taskGroup1 != null) {
+ logger.warn("Task group with the same name already exists, taskGroupName:{}.", taskGroup1.getName());
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
@@ -113,10 +117,11 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
taskGroup.setCreateTime(new Date());
taskGroup.setUpdateTime(new Date());
if (taskGroupMapper.insert(taskGroup) > 0) {
- permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(),
- Collections.singletonList(taskGroup.getId()), logger);
+ permissionPostHandle(AuthorizationType.TASK_GROUP, loginUser.getId(), Collections.singletonList(taskGroup.getId()),logger);
+ logger.info("Create task group complete, taskGroupName:{}.", taskGroup.getName());
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Create task group error, taskGroupName:{}.", taskGroup.getName());
putMsg(result, Status.CREATE_TASK_GROUP_ERROR);
return result;
}
@@ -143,14 +148,17 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
if (checkDescriptionLength(description)) {
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
if (name == null) {
+ logger.warn("Parameter name can ot be null.");
putMsg(result, Status.NAME_NULL);
return result;
}
if (groupSize <= 0) {
+ logger.warn("Parameter task group size is must bigger than 1.");
putMsg(result, Status.TASK_GROUP_SIZE_ERROR);
return result;
}
@@ -160,11 +168,13 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
.ne(TaskGroup::getId, id));
if (exists > 0) {
+ logger.error("Task group with the same name already exists.");
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() != Flag.YES.getCode()) {
+ logger.warn("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_ERROR);
return result;
}
@@ -175,8 +185,13 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
taskGroup.setName(name);
}
int i = taskGroupMapper.updateById(taskGroup);
- logger.info("update result:{}", i);
- putMsg(result, Status.SUCCESS);
+ if (i > 0) {
+ logger.info("Update task group complete, taskGroupId:{}.", id);
+ putMsg(result, Status.SUCCESS);
+ } else {
+ logger.error("Update task group error, taskGroupId:{}.", id);
+ putMsg(result, Status.UPDATE_TASK_GROUP_ERROR);
+ }
return result;
}
@@ -323,11 +338,16 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.NO.getCode()) {
+ logger.info("Task group has been closed, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_CLOSED);
return result;
}
taskGroup.setStatus(Flag.NO.getCode());
- taskGroupMapper.updateById(taskGroup);
+ int update = taskGroupMapper.updateById(taskGroup);
+ if (update > 0)
+ logger.info("Task group close complete, taskGroupId:{}.", id);
+ else
+ logger.error("Task group close error, taskGroupId:{}.", id);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -351,12 +371,17 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
}
TaskGroup taskGroup = taskGroupMapper.selectById(id);
if (taskGroup.getStatus() == Flag.YES.getCode()) {
+ logger.info("Task group has been started, taskGroupId:{}.", id);
putMsg(result, Status.TASK_GROUP_STATUS_OPENED);
return result;
}
taskGroup.setStatus(Flag.YES.getCode());
taskGroup.setUpdateTime(new Date(System.currentTimeMillis()));
int update = taskGroupMapper.updateById(taskGroup);
+ if (update > 0)
+ logger.info("Task group start complete, taskGroupId:{}.", id);
+ else
+ logger.error("Task group start error, taskGroupId:{}.", id);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -391,6 +416,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
return result;
}
taskGroupQueueService.modifyPriority(queueId, priority);
+ logger.info("Modify task group queue priority complete, queueId:{}, priority:{}.", queueId, priority);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 67da7c95ed..1aabfd44c7 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -52,6 +52,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -65,6 +67,8 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@Service
public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInstanceService {
+ private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceImpl.class);
+
@Autowired
ProjectMapper projectMapper;
@@ -201,18 +205,21 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
+ logger.error("Task instance can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
+ logger.error("Task definition can not be found, projectCode:{}, taskDefinitionCode:{}.", projectCode, task.getTaskCode());
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId);
return result;
}
// check whether the task instance state type is failure or cancel
if (!task.getState().isFailure() && !task.getState().isKill()) {
+ logger.warn("{} type task instance can not perform force success, projectCode:{}, taskInstanceId:{}.", task.getState().getDesc(), projectCode, taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
@@ -222,8 +229,10 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
processService.forceProcessInstanceSuccessByTaskInstanceId(taskInstanceId);
+ logger.info("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}", projectCode, taskInstanceId);
putMsg(result, Status.SUCCESS);
} else {
+ logger.error("Task instance performs force success complete, projectCode:{}, taskInstanceId:{}", projectCode, taskInstanceId);
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
@@ -235,7 +244,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
- Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,FORCED_SUCCESS);
+ Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode, FORCED_SUCCESS);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
putMsg(result,status);
@@ -244,6 +253,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
+ logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
@@ -272,6 +282,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstanceId);
if (taskInstance == null) {
+ logger.error("Task definition can not be found, projectCode:{}, taskInstanceId:{}.", projectCode, taskInstanceId);
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
index 994a3a0f74..5d866966d4 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
@@ -111,6 +111,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
private void updateTenantValid(Tenant existsTenant, Tenant updateTenant) throws ServiceException {
// Check the exists tenant
if (Objects.isNull(existsTenant)) {
+ logger.error("Tenant does not exist.");
throw new ServiceException(Status.TENANT_NOT_EXIST);
}
// Check the update tenant parameters
@@ -147,6 +148,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -215,6 +217,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -229,9 +232,14 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode()) && PropertyUtils.getResUploadStartupState()) {
storageOperate.createTenantDirIfNotExists(tenantCode);
}
- tenantMapper.updateById(updateTenant);
-
- putMsg(result, Status.SUCCESS);
+ int update = tenantMapper.updateById(updateTenant);
+ if (update > 0) {
+ logger.info("Tenant is updated and id is {}.", updateTenant.getId());
+ putMsg(result, Status.SUCCESS);
+ } else {
+ logger.error("Tenant update error, id:{}.", updateTenant.getId());
+ putMsg(result, Status.UPDATE_TENANT_ERROR);
+ }
return result;
}
@@ -254,22 +262,26 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
Tenant tenant = tenantMapper.queryById(id);
if (Objects.isNull(tenant)) {
+ logger.error("Tenant does not exist, userId:{}.", id);
throw new ServiceException(Status.TENANT_NOT_EXIST);
}
List<ProcessInstance> processInstances = getProcessInstancesByTenant(tenant);
if (CollectionUtils.isNotEmpty(processInstances)) {
+ logger.warn("Delete tenant failed, because there are {} executing process instances using it.", processInstances.size());
throw new ServiceException(Status.DELETE_TENANT_BY_ID_FAIL, processInstances.size());
}
List<ProcessDefinition> processDefinitions =
processDefinitionMapper.queryDefinitionListByTenant(tenant.getId());
if (CollectionUtils.isNotEmpty(processDefinitions)) {
+ logger.warn("Delete tenant failed, because there are {} process definitions using it.", processDefinitions.size());
throw new ServiceException(Status.DELETE_TENANT_BY_ID_FAIL_DEFINES, processDefinitions.size());
}
List<User> userList = userMapper.queryUserListByTenant(tenant.getId());
if (CollectionUtils.isNotEmpty(userList)) {
+ logger.warn("Delete tenant failed, because there are {} users using it.", userList.size());
throw new ServiceException(Status.DELETE_TENANT_BY_ID_FAIL_USERS, userList.size());
}
@@ -278,10 +290,16 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
storageOperate.deleteTenant(tenant.getTenantCode());
}
- tenantMapper.deleteById(id);
- processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
+ int delete = tenantMapper.deleteById(id);
+ if (delete > 0) {
+ processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
+ logger.info("Tenant is deleted and id is {}.", id);
+ putMsg(result, Status.SUCCESS);
+ } else {
+ logger.error("Tenant delete failed, tenantId:{}.", id);
+ putMsg(result, Status.DELETE_TENANT_BY_ID_ERROR);
+ }
- putMsg(result, Status.SUCCESS);
return result;
}
@@ -315,7 +333,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
* verify tenant code
*
* @param tenantCode tenant code
- * @return true if tenant code can user, otherwise return false
+ * @return true if tenant code can use, otherwise return false
*/
@Override
public Result<Object> verifyTenantCode(String tenantCode) {
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
index 486a9c4af2..578668eea6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
@@ -94,25 +94,27 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
return result;
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
// if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) {
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+ logger.error("Storage does not start up, resource upload startup state: {}.", PropertyUtils.getResUploadStartupState());
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
// verify udf func name exist
if (checkUdfFuncNameExists(funcName)) {
+ logger.warn("Udf function with the same name already exists.");
putMsg(result, Status.UDF_FUNCTION_EXISTS);
return result;
}
Resource resource = resourceMapper.selectById(resourceId);
if (resource == null) {
- logger.error("resourceId {} is not exist", resourceId);
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -138,6 +140,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
udf.setUpdateTime(now);
udfFuncMapper.insert(udf);
+ logger.info("UDF function create complete, udfFuncName:{}.", udf.getFuncName());
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.UDF, loginUser.getId(), Collections.singletonList(udf.getId()), logger);
return result;
@@ -169,6 +172,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
}
UdfFunc udfFunc = udfFuncMapper.selectById(id);
if (udfFunc == null) {
+ logger.error("Resource does not exist, resourceId:{}.", id);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -208,6 +212,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
return result;
}
if(checkDescriptionLength(desc)){
+ logger.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@@ -215,6 +220,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
UdfFunc udf = udfFuncMapper.selectUdfById(udfFuncId);
if (udf == null) {
+ logger.error("UDF function does not exist, udfFuncId:{}.", udfFuncId);
result.setCode(Status.UDF_FUNCTION_NOT_EXIST.getCode());
result.setMsg(Status.UDF_FUNCTION_NOT_EXIST.getMsg());
return result;
@@ -222,7 +228,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
// if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) {
- logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState());
+ logger.error("Storage does not start up, resource upload startup state: {}.", PropertyUtils.getResUploadStartupState());
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
@@ -230,7 +236,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
// verify udfFuncName is exist
if (!funcName.equals(udf.getFuncName())) {
if (checkUdfFuncNameExists(funcName)) {
- logger.error("UdfFuncRequest {} has exist, can't create again.", funcName);
+ logger.warn("Udf function exists, can not create again, udfFuncName:{}.", funcName);
result.setCode(Status.UDF_FUNCTION_EXISTS.getCode());
result.setMsg(Status.UDF_FUNCTION_EXISTS.getMsg());
return result;
@@ -239,7 +245,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
Resource resource = resourceMapper.selectById(resourceId);
if (resource == null) {
- logger.error("resourceId {} is not exist", resourceId);
+ logger.error("Resource does not exist, resourceId:{}.", resourceId);
result.setCode(Status.RESOURCE_NOT_EXIST.getCode());
result.setMsg(Status.RESOURCE_NOT_EXIST.getMsg());
return result;
@@ -259,6 +265,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
udf.setUpdateTime(now);
udfFuncMapper.updateById(udf);
+ logger.info("UDF function update complete, udfFuncId:{}, udfFuncName:{}.", udfFuncId, funcName);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -354,6 +361,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
}
udfFuncMapper.deleteById(id);
udfUserMapper.deleteByUdfFuncId(id);
+ logger.info("UDF function delete complete, udfFuncId:{}.", id);
putMsg(result, Status.SUCCESS);
return result;
}
@@ -374,6 +382,7 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
}
if (checkUdfFuncNameExists(name)) {
+ logger.warn("Udf function with the same already exists.");
putMsg(result, Status.UDF_FUNCTION_EXISTS);
} else {
putMsg(result, Status.SUCCESS);
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UiPluginServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UiPluginServiceImpl.java
index fe82497fad..48d6473284 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UiPluginServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UiPluginServiceImpl.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -39,6 +41,8 @@ import org.springframework.stereotype.Service;
@Service
public class UiPluginServiceImpl extends BaseServiceImpl implements UiPluginService {
+ private static final Logger logger = LoggerFactory.getLogger(UiPluginServiceImpl.class);
+
@Autowired
PluginDefineMapper pluginDefineMapper;
@@ -46,12 +50,14 @@ public class UiPluginServiceImpl extends BaseServiceImpl implements UiPluginServ
public Map<String, Object> queryUiPluginsByType(PluginType pluginType) {
Map<String, Object> result = new HashMap<>();
if (!pluginType.getHasUi()) {
+ logger.warn("Plugin does not have UI.");
putMsg(result, Status.PLUGIN_NOT_A_UI_COMPONENT);
return result;
}
List<PluginDefine> pluginDefines = pluginDefineMapper.queryByPluginType(pluginType.getDesc());
if (CollectionUtils.isEmpty(pluginDefines)) {
+ logger.warn("Query plugins result is null, check status of plugins.");
putMsg(result, Status.QUERY_PLUGINS_RESULT_IS_NULL);
return result;
}
@@ -66,6 +72,7 @@ public class UiPluginServiceImpl extends BaseServiceImpl implements UiPluginServ
Map<String, Object> result = new HashMap<>();
PluginDefine pluginDefine = pluginDefineMapper.queryDetailById(id);
if (null == pluginDefine) {
+ logger.warn("Query plugins result is empty, pluginId:{}.", id);
putMsg(result, Status.QUERY_PLUGIN_DETAIL_RESULT_IS_NULL);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
index f106c372dc..b19ef4db40 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
@@ -173,6 +173,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
if (!checkTenantExists(tenantId)) {
+ logger.warn("Tenant does not exist, tenantId:{}.", tenantId);
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
@@ -185,6 +186,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
storageOperate.createTenantDirIfNotExists(tenant.getTenantCode());
}
+ logger.info("User is created and id is {}.", user.getId());
result.put(Constants.DATA_LIST, user);
putMsg(result, Status.SUCCESS);
return result;
@@ -339,6 +341,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
return result;
}
if (!isAdmin(loginUser)) {
+ logger.warn("User does not have permission for this feature, userId:{}, userName:{}.", loginUser.getId(), loginUser.getUserName());
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -389,22 +392,26 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
return result;
}
if (check(result, !canOperator(loginUser, userId), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("User does not have permission for this feature, userId:{}, userName:{}.", loginUser.getId(), loginUser.getUserName());
return result;
}
User user = userMapper.selectById(userId);
if (user == null) {
+ logger.error("User does not exist, userId:{}.", userId);
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
if (StringUtils.isNotEmpty(userName)) {
if (!CheckUtils.checkUserName(userName)) {
+ logger.warn("Parameter userName check failed.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, userName);
return result;
}
User tempUser = userMapper.queryByUserNameAccurately(userName);
if (tempUser != null && tempUser.getId() != userId) {
+ logger.warn("User name already exists, userName:{}.", tempUser.getUserName());
putMsg(result, Status.USER_NAME_EXIST);
return result;
}
@@ -413,6 +420,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (StringUtils.isNotEmpty(userPassword)) {
if (!CheckUtils.checkPasswordLength(userPassword)) {
+ logger.warn("Parameter userPassword check failed.");
putMsg(result, Status.USER_PASSWORD_LENGTH_ERROR);
return result;
}
@@ -421,6 +429,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (StringUtils.isNotEmpty(email)) {
if (!CheckUtils.checkEmail(email)) {
+ logger.warn("Parameter email check failed.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, email);
return result;
}
@@ -428,17 +437,20 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
if (StringUtils.isNotEmpty(phone) && !CheckUtils.checkPhone(phone)) {
+ logger.warn("Parameter phone check failed.");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, phone);
return result;
}
if (state == 0 && user.getState() != state && Objects.equals(loginUser.getId(), user.getId())) {
+ logger.warn("Not allow to disable your own account, userId:{}, userName:{}.", user.getId(), user.getUserName());
putMsg(result, Status.NOT_ALLOW_TO_DISABLE_OWN_ACCOUNT);
return result;
}
if (StringUtils.isNotEmpty(timeZone)) {
if (!CheckUtils.checkTimeZone(timeZone)) {
+ logger.warn("Parameter time zone is illegal.");
putMsg(result, Status.TIME_ZONE_ILLEGAL, timeZone);
return result;
}
@@ -452,8 +464,15 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
user.setUpdateTime(now);
user.setTenantId(tenantId);
// updateProcessInstance user
- userMapper.updateById(user);
- putMsg(result, Status.SUCCESS);
+ int update = userMapper.updateById(user);
+ if (update > 0) {
+ logger.info("User is updated and id is :{}.", userId);
+ putMsg(result, Status.SUCCESS);
+ } else {
+ logger.error("User update error, userId:{}.", userId);
+ putMsg(result, Status.UPDATE_USER_ERROR);
+ }
+
return result;
}
@@ -475,12 +494,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// only admin can operate
if (!isAdmin(loginUser)) {
+ logger.warn("User does not have permission for this feature, userId:{}, userName:{}.", loginUser.getId(), loginUser.getUserName());
putMsg(result, Status.USER_NO_OPERATION_PERM, id);
return result;
}
// check exist
User tempUser = userMapper.selectById(id);
if (tempUser == null) {
+ logger.error("User does not exist, userId:{}.", id);
putMsg(result, Status.USER_NOT_EXIST, id);
return result;
}
@@ -489,6 +510,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (CollectionUtils.isNotEmpty(projects)) {
String projectNames = projects.stream().map(Project::getName).collect(Collectors.joining(","));
putMsg(result, Status.TRANSFORM_PROJECT_OWNERSHIP, projectNames);
+ logger.warn("Please transfer the project ownership before deleting the user, userId:{}, projects:{}.", id, projectNames);
return result;
}
// delete user
@@ -496,11 +518,15 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
accessTokenMapper.deleteAccessTokenByUserId(id);
- userMapper.deleteById(id);
-
- putMsg(result, Status.SUCCESS);
-
- return result;
+ if (userMapper.deleteById(id) > 0) {
+ logger.info("User is deleted and id is :{}.", id);
+ putMsg(result, Status.SUCCESS);
+ return result;
+ } else {
+ logger.error("User delete error, userId:{}.", id);
+ putMsg(result, Status.DELETE_USER_BY_ID_ERROR);
+ return result;
+ }
}
/**
@@ -524,6 +550,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// check exist
User tempUser = userMapper.selectById(userId);
if (tempUser == null) {
+ logger.error("User does not exist, userId:{}.", userId);
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -531,6 +558,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
projectUserMapper.deleteProjectRelation(0, userId);
if (check(result, StringUtils.isEmpty(projectIds), Status.SUCCESS)) {
+ logger.warn("Parameter projectIds is empty.");
return result;
}
Arrays.stream(projectIds.split(",")).distinct().forEach(projectId -> {
@@ -569,6 +597,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// 1. check if user is existed
User tempUser = this.userMapper.selectById(userId);
if (tempUser == null) {
+ logger.error("User does not exist, userId:{}.", userId);
this.putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -576,12 +605,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// 2. check if project is existed
Project project = this.projectMapper.queryByCode(projectCode);
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
this.putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
// 3. only project owner can operate
if (!this.canOperator(loginUser, project.getUserId())) {
+ logger.warn("User does not have permission for project, userId:{}, userName:{}, projectCode:{}.", loginUser.getId(), loginUser.getUserName(), projectCode);
this.putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@@ -598,7 +629,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
projectUser.setUpdateTime(today);
this.projectUserMapper.insert(projectUser);
}
-
+ logger.info("User is granted permission for projects, userId:{}, projectCode:{}.", userId, projectCode);
this.putMsg(result, Status.SUCCESS);
return result;
}
@@ -622,12 +653,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// 1. only admin can operate
if (this.check(result, !this.isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("Only admin can revoke the project permission.");
return result;
}
// 2. check if user is existed
User user = this.userMapper.selectById(userId);
if (user == null) {
+ logger.error("User does not exist, userId:{}.", userId);
this.putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -635,12 +668,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// 3. check if project is existed
Project project = this.projectMapper.queryByCode(projectCode);
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
this.putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
// 4. delete th relationship between project and user
this.projectUserMapper.deleteProjectRelation(project.getId(), user.getId());
+ logger.info("User is revoked permission for projects, userId:{}, projectCode:{}.", userId, projectCode);
this.putMsg(result, Status.SUCCESS);
return result;
}
@@ -664,6 +699,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
User user = userMapper.selectById(userId);
if (user == null) {
+ logger.error("User does not exist, userId:{}.", userId);
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -694,7 +730,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
if (CollectionUtils.isNotEmpty(oldAuthorizedResIds)) {
- // get all resource id of process definitions those is released
+ // get all resource id of process definitions those are released
List<Map<String, Object>> list = processDefinitionMapper.listResourcesByUser(userId);
Map<Integer, Set<Long>> resourceProcessMap =
ResourceProcessDefinitionUtils.getResourceProcessDefinitionMap(list);
@@ -702,9 +738,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
resourceIdSet.retainAll(oldAuthorizedResIds);
if (CollectionUtils.isNotEmpty(resourceIdSet)) {
- logger.error("can't be deleted,because it is used of process definition");
for (Integer resId : resourceIdSet) {
- logger.error("resource id:{} is used of process definition {}", resId,
+ logger.error("Resource id:{} is used by process definition {}", resId,
resourceProcessMap.get(resId));
}
putMsg(result, Status.RESOURCE_IS_USED);
@@ -716,12 +751,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
resourceUserMapper.deleteResourceUser(userId, 0);
if (check(result, StringUtils.isEmpty(resourceIds), Status.SUCCESS)) {
+ logger.warn("Parameter resourceIds is empty.");
return result;
}
for (int resourceIdValue : needAuthorizeResIds) {
Resource resource = resourceMapper.selectById(resourceIdValue);
if (resource == null) {
+ logger.error("Resource does not exist, resourceId:{}.", resourceIdValue);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
@@ -742,6 +779,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
+ logger.info("User is granted permission for resources, userId:{}, resourceIds:{}.", user.getId(), needAuthorizeResIds);
+
putMsg(result, Status.SUCCESS);
return result;
@@ -766,6 +805,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
User user = userMapper.selectById(userId);
if (user == null) {
+ logger.error("User does not exist, userId:{}.", userId);
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -773,6 +813,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
udfUserMapper.deleteByUserId(userId);
if (check(result, StringUtils.isEmpty(udfIds), Status.SUCCESS)) {
+ logger.warn("Parameter udfIds is empty.");
return result;
}
@@ -789,6 +830,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
udfUserMapper.insert(udfUser);
}
+ logger.info("User is granted permission for UDF, userName:{}.", user.getUserName());
+
putMsg(result, Status.SUCCESS);
return result;
@@ -813,12 +856,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// only admin can operate
if (this.check(result, !this.isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("Only admin can grant namespaces.");
return result;
}
// check exist
User tempUser = userMapper.selectById(userId);
if (tempUser == null) {
+ logger.error("User does not exist, userId:{}.", userId);
putMsg(result, Status.USER_NOT_EXIST, userId);
return result;
}
@@ -838,6 +883,8 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
}
+ logger.info("User is granted permission for namespace, userId:{}.", tempUser.getId());
+
putMsg(result, Status.SUCCESS);
return result;
@@ -952,6 +999,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("Only admin can query all general users.");
return result;
}
@@ -1020,6 +1068,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("Only admin can deauthorize user.");
return result;
}
@@ -1060,6 +1109,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
}
// only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
+ logger.warn("Only admin can authorize user.");
return result;
}
List<User> userList = userMapper.queryUserListByAlertGroupId(alertGroupId);
@@ -1084,16 +1134,16 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
String msg = null;
if (!CheckUtils.checkUserName(userName)) {
-
+ logger.warn("Parameter userName check failed.");
msg = userName;
} else if (!CheckUtils.checkPassword(password)) {
-
+ logger.warn("Parameter password check failed.");
msg = password;
} else if (!CheckUtils.checkEmail(email)) {
-
+ logger.warn("Parameter email check failed.");
msg = email;
} else if (!CheckUtils.checkPhone(phone)) {
-
+ logger.warn("Parameter phone check failed.");
msg = phone;
}
@@ -1119,7 +1169,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// verify whether exist
if (!storageOperate.exists(oldTenantCode,
String.format(Constants.FORMAT_S_S, srcBasePath, component.getFullName()))) {
- logger.error("resource file: {} not exist,copy error", component.getFullName());
+ logger.error("Resource file: {} does not exist, copy error.", component.getFullName());
throw new ServiceException(Status.RESOURCE_NOT_EXIST);
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
index 836ac3be29..791d4acc22 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java
@@ -51,6 +51,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -61,6 +63,8 @@ import org.springframework.util.CollectionUtils;
@Service
public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkFlowLineageService {
+ private static final Logger logger = LoggerFactory.getLogger(WorkFlowLineageServiceImpl.class);
+
@Autowired
private WorkFlowLineageMapper workFlowLineageMapper;
@@ -78,6 +82,7 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
@@ -92,6 +97,7 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
@@ -160,6 +166,7 @@ public class WorkFlowLineageServiceImpl extends BaseServiceImpl implements WorkF
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode);
if (project == null) {
+ logger.error("Project does not exist, projectCode:{}.", projectCode);
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 4e27998019..b00efe69b3 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -95,6 +95,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return result;
}
if (StringUtils.isEmpty(name)) {
+ logger.warn("Parameter name can ot be null.");
putMsg(result, Status.NAME_NULL);
return result;
}
@@ -117,15 +118,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroup.setDescription(description);
if (checkWorkerGroupNameExists(workerGroup)) {
+ logger.warn("Worker group with the same name already exists, name:{}.", workerGroup.getName());
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
String invalidAddr = checkWorkerGroupAddrList(workerGroup);
if (invalidAddr != null) {
+ logger.warn("Worker group address is invalid, invalidAddr:{}.", invalidAddr);
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
+ logger.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
return result;
}
@@ -295,7 +299,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
try {
workerGroupList = registryClient.getChildrenKeys(workerPath);
} catch (Exception e) {
- logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
+ logger.error("Get worker groups exception, workerPath:{}, isPaging:{}", workerPath, isPaging, e);
}
if (CollectionUtils.isEmpty(workerGroupList)) {
@@ -317,7 +321,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
try {
childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
} catch (Exception e) {
- logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
+ logger.error("Get children nodes exception, workerGroupPath:{}.", workerGroupPath, e);
}
if (childrenNodes == null || childrenNodes.isEmpty()) {
continue;
@@ -362,17 +366,22 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
}
WorkerGroup workerGroup = workerGroupMapper.selectById(id);
if (workerGroup == null) {
+ logger.error("Worker group does not exist, workerGroupId:{}.", id);
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper
.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
+ List<Integer> processInstanceIds = processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList());
+ logger.warn("Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.",
+ processInstances.size(), processInstanceIds);
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
workerGroupMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "");
+ logger.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
index 3304d0c497..f85a7b111c 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/FileUtils.java
@@ -65,8 +65,7 @@ public class FileUtils {
if (resource.exists() || resource.isReadable()) {
return resource;
} else {
- logger.error("file can not read : {}", filename);
-
+ logger.error("File can not be read, fileName:{}", filename);
}
return null;
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
index 4660b7e276..027f9c1860 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
@@ -183,6 +183,7 @@ public class TaskGroupServiceTest {
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.TASK_GROUP, null,
0, serviceLogger)).thenReturn(true);
Mockito.when(taskGroupMapper.selectById(1)).thenReturn(taskGroup);
+ Mockito.when(taskGroupMapper.updateById(taskGroup)).thenReturn(1);
Map<String, Object> result = taskGroupService.updateTaskGroup(loginUser, 1, "newName", "desc", 100);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
index bb4eb487bf..0ff9d4c5de 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
@@ -171,6 +171,7 @@ public class TenantServiceTest {
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.TENANT, null, getLoginUser().getId(), TENANT_UPDATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.TENANT, null, 0, baseServiceLogger)).thenReturn(true);
+ Mockito.when(tenantMapper.updateById(getTenant())).thenReturn(1);
// update not exists tenant
Throwable exception = Assertions.assertThrows(ServiceException.class, () -> tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, tenantDesc));
@@ -218,6 +219,7 @@ public class TenantServiceTest {
// success
Mockito.when(tenantMapper.queryById(4)).thenReturn(getTenant(4));
+ Mockito.when(tenantMapper.deleteById(4)).thenReturn(1);
Map<String, Object> result = tenantService.deleteTenantById(getLoginUser(), 4);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
index 63dc2481c6..71fbfaf2b8 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
@@ -306,6 +306,7 @@ public class UsersServiceTest {
// success
when(userMapper.selectById(1)).thenReturn(getUser());
+ when(userMapper.updateById(getUser())).thenReturn(1);
result = usersService.updateUser(getLoginUser(), 1, userName, userPassword, "32222s@qq.com", 1,
"13457864543", "queue", 1, "Asia/Shanghai");
logger.info(result.toString());
@@ -322,8 +323,8 @@ public class UsersServiceTest {
try {
when(userMapper.queryTenantCodeByUserId(1)).thenReturn(getUser());
when(userMapper.selectById(1)).thenReturn(getUser());
- when(accessTokenMapper.deleteAccessTokenByUserId(1)).thenReturn(0);
- // no operate
+ when(userMapper.deleteById(1)).thenReturn(1);
+ //no operate
Map<String, Object> result = usersService.deleteUserById(loginUser, 3);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS));
diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
index c392da409c..80c2d68cc8 100644
--- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
+++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
@@ -54,7 +54,7 @@ public class DataSourceUtils {
public static ConnectionParam buildConnectionParams(BaseDataSourceParamDTO baseDataSourceParamDTO) {
ConnectionParam connectionParams = getDatasourceProcessor(baseDataSourceParamDTO.getType())
.createConnectionParams(baseDataSourceParamDTO);
- logger.info("parameters map:{}", connectionParams);
+ logger.info("Parameters map:{}", connectionParams);
return connectionParams;
}