You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/16 07:35:04 UTC
[inlong] branch master updated: [INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 64bc316c4 [INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)
64bc316c4 is described below
commit 64bc316c4e1146cd36c0acd184bf69b75568153b
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Jan 16 15:34:59 2023 +0800
[INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)
---
.../inlong/manager/common/util/Preconditions.java | 38 +++++++
.../service/group/InlongGroupProcessService.java | 10 --
.../service/group/InlongGroupServiceImpl.java | 54 +++-------
.../manager/service/node/DataNodeServiceImpl.java | 59 ++++-------
.../service/sink/StreamSinkServiceImpl.java | 111 ++++++++-------------
.../service/stream/InlongStreamServiceImpl.java | 53 +---------
.../controller/openapi/OpenDataNodeController.java | 12 +++
.../openapi/OpenInLongGroupController.java | 16 +++
.../openapi/OpenInLongStreamController.java | 17 ++++
.../openapi/OpenStreamSinkController.java | 12 +++
.../web/controller/DataNodeControllerTest.java | 2 +-
11 files changed, 175 insertions(+), 209 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
index 725d095dc..4def9ead8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
@@ -18,11 +18,14 @@
package org.apache.inlong.manager.common.util;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
@@ -125,6 +128,41 @@ public class Preconditions {
}
}
+ public static void checkBlank(String obj, ErrorCodeEnum errorCodeEnum) {
+ if (StringUtils.isBlank(obj)) {
+ throw new BusinessException(errorCodeEnum);
+ }
+ }
+
+ public static void checkBlank(String obj, ErrorCodeEnum errorCodeEnum, String errMsg) {
+ if (StringUtils.isBlank(obj)) {
+ throw new BusinessException(errorCodeEnum, errMsg);
+ }
+ }
+
+ public static void checkNull(Object obj, ErrorCodeEnum errorCodeEnum) {
+ if (obj == null) {
+ throw new BusinessException(errorCodeEnum);
+ }
+ }
+
+ public static void checkNull(Object obj, ErrorCodeEnum errorCodeEnum, String errMsg) {
+ if (obj == null) {
+ throw new BusinessException(errorCodeEnum, errMsg);
+ }
+ }
+
+ public static void chkNotEquals(Object a, Object b, ErrorCodeEnum errorCodeEnum) {
+ if (!Objects.equals(a, b)) {
+ throw new BusinessException(errorCodeEnum);
+ }
+ }
+
+ public static void chkNotEquals(Object a, Object b, ErrorCodeEnum errorCodeEnum, String errMsg) {
+ if (!Objects.equals(a, b)) {
+ throw new BusinessException(errorCodeEnum, errMsg);
+ }
+ }
/**
* Whether a target string is in a string separated by the separator.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index e6f269f78..576618983 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.group;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -233,15 +232,6 @@ public class InlongGroupProcessService {
* Delete InlongGroup logically and delete related resource in a synchronous way.
*/
public Boolean deleteProcess(String groupId, UserInfo opInfo) {
- // check groupId parameter
- if (StringUtils.isBlank(groupId)) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "inlong group id in request cannot be blank");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index e3d768f1e..0eeb312f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -177,15 +177,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
@Transactional(rollbackFor = Throwable.class)
public String save(InlongGroupRequest request, UserInfo opInfo) {
- // check parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY,
- "inlong group request cannot be empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
String groupId = request.getInlongGroupId();
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity != null) {
@@ -232,14 +223,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public InlongGroupInfo get(String groupId, UserInfo opInfo) {
- // check group id
- if (StringUtils.isBlank(groupId)) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -372,14 +355,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public List<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "group query request cannot be empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// filter records;
List<InlongGroupEntity> filterGroupEntities = new ArrayList<>();
for (InlongGroupEntity groupEntity : groupMapper.selectByCondition(request)) {
@@ -424,11 +399,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.error("inlong group not found by groupId={}", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
- if (!Objects.equals(entity.getVersion(), request.getVersion())) {
- LOGGER.error("inlong group has already updated with groupId={}, curVersion={}",
- groupId, request.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ chkUnmodifiableParams(entity, request);
// check whether the current status can be modified
doUpdateCheck(entity, request, operator);
@@ -446,22 +417,12 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
public String update(InlongGroupRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "group query request cannot be empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
String groupId = request.getInlongGroupId();
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
- if (!Objects.equals(entity.getVersion(), request.getVersion())) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ chkUnmodifiableParams(entity, request);
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
List<String> inCharges = Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
@@ -697,4 +658,15 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
return sortConf;
}
+
+ private void chkUnmodifiableParams(InlongGroupEntity entity, InlongGroupRequest request) {
+ // check mqType
+ Preconditions.chkNotEquals(entity.getMqType(), request.getMqType(),
+ ErrorCodeEnum.INVALID_PARAMETER, "mqType not allowed modify");
+ // check record version
+ Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ entity.getVersion(), request.getVersion()));
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 614ff8ca2..130709ce8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -76,14 +77,6 @@ public class DataNodeServiceImpl implements DataNodeService {
@Override
public Integer save(DataNodeRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -118,13 +111,6 @@ public class DataNodeServiceImpl implements DataNodeService {
@Override
public DataNodeInfo get(Integer id, UserInfo opInfo) {
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -133,8 +119,7 @@ public class DataNodeServiceImpl implements DataNodeService {
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
}
- String dataNodeType = entity.getType();
- DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
return dataNodeOperator.getFromEntity(entity);
}
@@ -172,14 +157,6 @@ public class DataNodeServiceImpl implements DataNodeService {
@Override
public List<DataNodeInfo> list(DataNodePageRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -203,6 +180,8 @@ public class DataNodeServiceImpl implements DataNodeService {
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
String.format("data node record not found by id=%d", request.getId()));
}
+ // check whether modify unmodifiable parameters
+ chkUnmodifiableParams(curEntity, request);
// Check whether the data node name exists with the same name and type
if (request.getName() != null) {
if (StringUtils.isBlank(request.getName())) {
@@ -227,14 +206,6 @@ public class DataNodeServiceImpl implements DataNodeService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(DataNodeRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -245,6 +216,8 @@ public class DataNodeServiceImpl implements DataNodeService {
throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
String.format("data node record not found by id=%d", request.getId()));
}
+ // check whether modify unmodifiable parameters
+ chkUnmodifiableParams(curEntity, request);
// Check whether the data node name exists with the same name and type
if (request.getName() != null) {
if (StringUtils.isBlank(request.getName())) {
@@ -278,7 +251,6 @@ public class DataNodeServiceImpl implements DataNodeService {
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
-
request.setId(entity.getId());
Boolean result = this.update(request, operator);
LOGGER.info("success to update data node by key: {}", request);
@@ -298,14 +270,6 @@ public class DataNodeServiceImpl implements DataNodeService {
@Override
public Boolean delete(Integer id, UserInfo opInfo) {
- // check id parameter
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -339,6 +303,17 @@ public class DataNodeServiceImpl implements DataNodeService {
return true;
}
+ private void chkUnmodifiableParams(DataNodeEntity curEntity, DataNodeRequest request) {
+ // check type
+ Preconditions.chkNotEquals(curEntity.getType(), request.getType(),
+ ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
+ // check record version
+ Preconditions.chkNotEquals(curEntity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ curEntity.getVersion(), request.getVersion()));
+ }
+
@Override
public Boolean deleteByKey(String name, String type, String operator) {
DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(name, type);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index fa1afaa0a..621afda19 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -68,7 +68,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -152,10 +151,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
public Integer save(SinkRequest request, UserInfo opInfo) {
// check request parameter
checkSinkRequestParams(request);
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
@@ -227,14 +222,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public StreamSink get(Integer id, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
- // check sink id
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
- }
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
@@ -327,18 +314,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<? extends StreamSink> listByCondition(SinkPageRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "group query request cannot be empty");
- }
// check sink id
if (StringUtils.isBlank(request.getInlongGroupId())) {
throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
}
- // check opInfo
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// query result
List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByCondition(request);
Map<String, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -372,24 +351,31 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Transactional(rollbackFor = Throwable.class)
public Boolean update(SinkRequest request, String operator) {
LOGGER.info("begin to update sink by id: {}", request);
- this.checkParams(request);
- Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
-
- // Check if it can be modified
- String groupId = request.getInlongGroupId();
- String streamId = request.getInlongStreamId();
- String sinkName = request.getSinkName();
- groupCheckService.checkGroupStatus(groupId, operator);
-
+ if (request == null) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "inlong sink request is empty");
+ }
+ if (request.getId() == null) {
+ throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
+ }
+ StreamSinkEntity curEntity = sinkMapper.selectByPrimaryKey(request.getId());
+ if (curEntity == null) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+ }
+ chkUnmodifiableParams(curEntity, request);
+ groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
// Check whether the stream exist or not
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+ request.getInlongGroupId(), request.getInlongStreamId());
Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
// Check whether the sink name exists with the same groupId and streamId
- StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+ StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
+ request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
if (existEntity != null && !existEntity.getId().equals(request.getId())) {
String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
- throw new BusinessException(String.format(errMsg, sinkName, groupId, streamId));
+ throw new BusinessException(String.format(errMsg,
+ request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
}
SinkStatus nextStatus = null;
@@ -404,7 +390,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
if (streamSuccess && request.getStartProcess()) {
- this.startProcessForSink(groupId, streamId, operator);
+ this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
}
LOGGER.info("success to update sink by id: {}", request);
@@ -417,32 +403,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
if (request.getId() == null) {
throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
}
- // check opInfo
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
StreamSinkEntity curEntity = sinkMapper.selectByPrimaryKey(request.getId());
if (curEntity == null) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
}
- if (!Objects.equals(curEntity.getVersion(), request.getVersion())) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("sink has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
- curEntity.getInlongGroupId(), curEntity.getInlongStreamId(), curEntity.getSinkName(),
- curEntity.getVersion()));
- }
- if (StringUtils.isNotBlank(request.getInlongGroupId())
- && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "InlongGroupId not allowed modify");
- }
- if (StringUtils.isNotBlank(request.getInlongStreamId())
- && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "InlongStreamId not allowed modify");
- }
- request.setInlongGroupId(curEntity.getInlongGroupId());
- request.setInlongStreamId(curEntity.getInlongStreamId());
+ chkUnmodifiableParams(curEntity, request);
// check group record
InlongGroupEntity curGroupEntity = groupMapper.selectByGroupId(curEntity.getInlongGroupId());
if (curGroupEntity == null) {
@@ -510,7 +475,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
-
request.setId(entity.getId());
Boolean result = this.update(request, operator);
LOGGER.info("success to update sink by key: {}", request);
@@ -552,13 +516,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo) {
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- // check opInfo
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// check stream sink record
StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
if (sinkEntity == null) {
@@ -729,9 +686,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private void checkSinkRequestParams(SinkRequest request) {
// check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
// check group id
String groupId = request.getInlongGroupId();
if (StringUtils.isBlank(groupId)) {
@@ -787,4 +741,27 @@ public class StreamSinkServiceImpl implements StreamSinkService {
streamProcessOperation.deleteProcess(groupId, streamId, operator, false);
LOGGER.debug("success to start the delete-stream-process for groupId={} streamId={}", groupId, streamId);
}
+
+ private void chkUnmodifiableParams(StreamSinkEntity curEntity, SinkRequest request) {
+ // check type
+ Preconditions.chkNotEquals(curEntity.getSinkType(), request.getSinkType(),
+ ErrorCodeEnum.INVALID_PARAMETER, "sinkType not allowed modify");
+ // check record version
+ Preconditions.chkNotEquals(curEntity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ curEntity.getVersion(), request.getVersion()));
+ if (StringUtils.isNotBlank(request.getInlongGroupId())
+ && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongGroupId not allowed modify");
+ }
+ if (StringUtils.isNotBlank(request.getInlongStreamId())
+ && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "InlongStreamId not allowed modify");
+ }
+ request.setInlongGroupId(curEntity.getInlongGroupId());
+ request.setInlongStreamId(curEntity.getInlongStreamId());
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1bd605647..0ae06efe7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -141,14 +141,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public Integer save(InlongStreamRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "inlong stream info is empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -233,18 +225,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public InlongStreamInfo get(String groupId, String streamId, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
- // check group id
- if (StringUtils.isBlank(groupId)) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
- // check stream id
- if (StringUtils.isBlank(streamId)) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -341,14 +321,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public List<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "group query request cannot be empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
request.setCurrentUser(LoginUserUtils.getLoginUser().getName());
request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.ADMIN));
return CommonBeanUtils.copyListProperties(streamMapper.selectByCondition(request), InlongStreamBriefInfo::new);
@@ -436,18 +408,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(InlongStreamRequest request, UserInfo opInfo) {
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "inlong stream request is empty");
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
}
+ // check record version
+ Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ entity.getVersion(), request.getVersion()));
// only the person in charges can query
if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
List<String> inCharges = Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
@@ -581,18 +550,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean delete(String groupId, String streamId, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
- // check group id
- if (StringUtils.isBlank(groupId)) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
- // check stream id
- if (StringUtils.isBlank(streamId)) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
- }
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
index 9fd214166..bd32fbda6 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
import org.apache.inlong.manager.pojo.common.Response;
@@ -58,12 +60,16 @@ public class OpenDataNodeController {
@ApiOperation(value = "Get node by id")
@ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
public Response<DataNodeInfo> get(@PathVariable Integer id) {
+ Preconditions.checkNull(id, ErrorCodeEnum.ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(dataNodeService.get(id, LoginUserUtils.getLoginUser()));
}
@PostMapping(value = "/node/list")
@ApiOperation(value = "List data node")
public Response<List<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(dataNodeService.list(request, LoginUserUtils.getLoginUser()));
}
@@ -71,6 +77,8 @@ public class OpenDataNodeController {
@ApiOperation(value = "Save node")
@OperationLog(operation = OperationType.CREATE)
public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody DataNodeRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(dataNodeService.save(request, LoginUserUtils.getLoginUser()));
}
@@ -78,6 +86,8 @@ public class OpenDataNodeController {
@ApiOperation(value = "Update data node")
@OperationLog(operation = OperationType.UPDATE)
public Response<Boolean> update(@Validated(UpdateByIdValidation.class) @RequestBody DataNodeRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(dataNodeService.update(request, LoginUserUtils.getLoginUser()));
}
@@ -86,6 +96,8 @@ public class OpenDataNodeController {
@OperationLog(operation = OperationType.DELETE)
@ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
public Response<Boolean> delete(@PathVariable Integer id) {
+ Preconditions.checkNull(id, ErrorCodeEnum.ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(dataNodeService.delete(id, LoginUserUtils.getLoginUser()));
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index 66e39f0e8..c279997a5 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.Response;
@@ -63,12 +65,17 @@ public class OpenInLongGroupController {
@ApiOperation(value = "Get InLong group information by groupId")
@ApiImplicitParam(name = "groupId", value = "InLong Group ID", dataTypeClass = String.class, required = true)
public Response<InlongGroupInfo> get(@PathVariable String groupId) {
+ Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(groupService.get(groupId, LoginUserUtils.getLoginUser()));
}
@PostMapping(value = "/group/list")
@ApiOperation(value = "List inlong groups by paginating")
public Response<List<InlongGroupBriefInfo>> listBrief(@RequestBody InlongGroupPageRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+ "group query request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(groupService.listBrief(request, LoginUserUtils.getLoginUser()));
}
@@ -76,6 +83,9 @@ public class OpenInLongGroupController {
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save inlong group")
public Response<String> save(@Validated(SaveValidation.class) @RequestBody InlongGroupRequest groupRequest) {
+ Preconditions.checkNull(groupRequest, ErrorCodeEnum.REQUEST_IS_EMPTY,
+ "inlong group request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(groupService.save(groupRequest, LoginUserUtils.getLoginUser()));
}
@@ -83,6 +93,9 @@ public class OpenInLongGroupController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update inlong group")
public Response<String> update(@Validated(UpdateValidation.class) @RequestBody InlongGroupRequest groupRequest) {
+ Preconditions.checkNull(groupRequest, ErrorCodeEnum.REQUEST_IS_EMPTY,
+ "group query request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(groupService.update(groupRequest, LoginUserUtils.getLoginUser()));
}
@@ -91,6 +104,9 @@ public class OpenInLongGroupController {
@OperationLog(operation = OperationType.DELETE)
@ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
public Response<Boolean> delete(@PathVariable String groupId) {
+ Preconditions.checkBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
+ "inlong group id in request cannot be blank");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(groupProcessOperation.deleteProcess(groupId, LoginUserUtils.getLoginUser()));
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 031e54ca9..92786b1b2 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -60,12 +62,18 @@ public class OpenInLongStreamController {
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
})
public Response<InlongStreamInfo> get(@RequestParam String groupId, @RequestParam String streamId) {
+ Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Preconditions.checkBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamService.get(groupId, streamId, LoginUserUtils.getLoginUser()));
}
@RequestMapping(value = "/stream/list", method = RequestMethod.POST)
@ApiOperation(value = "List inlong stream briefs by paginating")
public Response<List<InlongStreamBriefInfo>> listByCondition(@RequestBody InlongStreamPageRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+ "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamService.listBrief(request, LoginUserUtils.getLoginUser()));
}
@@ -73,6 +81,9 @@ public class OpenInLongStreamController {
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save inlong stream")
public Response<Integer> save(@RequestBody InlongStreamRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+ "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamService.save(request, LoginUserUtils.getLoginUser()));
}
@@ -80,6 +91,9 @@ public class OpenInLongStreamController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update inlong stream")
public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody InlongStreamRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+ "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamService.update(request, LoginUserUtils.getLoginUser()));
}
@@ -91,6 +105,9 @@ public class OpenInLongStreamController {
@ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
})
public Response<Boolean> delete(@RequestParam String groupId, @RequestParam String streamId) {
+ Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Preconditions.checkBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamService.delete(groupId, streamId, LoginUserUtils.getLoginUser()));
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index c6048694c..b8561668d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
@@ -57,12 +59,16 @@ public class OpenStreamSinkController {
@ApiOperation(value = "Get stream sink")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
public Response<StreamSink> get(@PathVariable Integer id) {
+ Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.get(id, LoginUserUtils.getLoginUser()));
}
@RequestMapping(value = "/sink/list", method = RequestMethod.GET)
@ApiOperation(value = "List stream sinks by paginating")
public Response<List<? extends StreamSink>> listByCondition(SinkPageRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser()));
}
@@ -70,6 +76,8 @@ public class OpenStreamSinkController {
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save stream sink")
public Response<Integer> save(@Validated @RequestBody SinkRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser()));
}
@@ -77,6 +85,8 @@ public class OpenStreamSinkController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream sink")
public Response<Boolean> update(@Validated(UpdateByIdValidation.class) @RequestBody SinkRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.update(request, LoginUserUtils.getLoginUser()));
}
@@ -89,6 +99,8 @@ public class OpenStreamSinkController {
})
public Response<Boolean> delete(@PathVariable Integer id,
@RequestParam(required = false, defaultValue = "false") boolean startProcess) {
+ Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sinkService.delete(id, startProcess, LoginUserUtils.getLoginUser()));
}
}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
index 1214079b6..b8ea2f44b 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
@@ -118,7 +118,7 @@ class DataNodeControllerTest extends WebBaseTest {
// insert the test data
DataNodeEntity nodeEntity = new DataNodeEntity();
nodeEntity.setName("test");
- nodeEntity.setType("MYSQL");
+ nodeEntity.setType(DataNodeType.HIVE);
nodeEntity.setIsDeleted(0);
nodeEntity.setModifier("test");
nodeEntity.setCreator("test");