You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/16 07:35:04 UTC

[inlong] branch master updated: [INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bc316c4 [INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)
64bc316c4 is described below

commit 64bc316c4e1146cd36c0acd184bf69b75568153b
Author: Goson Zhang <46...@qq.com>
AuthorDate: Mon Jan 16 15:34:59 2023 +0800

    [INLONG-7229][Manager] Add checks for unmodifiable field values (#7230)
---
 .../inlong/manager/common/util/Preconditions.java  |  38 +++++++
 .../service/group/InlongGroupProcessService.java   |  10 --
 .../service/group/InlongGroupServiceImpl.java      |  54 +++-------
 .../manager/service/node/DataNodeServiceImpl.java  |  59 ++++-------
 .../service/sink/StreamSinkServiceImpl.java        | 111 ++++++++-------------
 .../service/stream/InlongStreamServiceImpl.java    |  53 +---------
 .../controller/openapi/OpenDataNodeController.java |  12 +++
 .../openapi/OpenInLongGroupController.java         |  16 +++
 .../openapi/OpenInLongStreamController.java        |  17 ++++
 .../openapi/OpenStreamSinkController.java          |  12 +++
 .../web/controller/DataNodeControllerTest.java     |   2 +-
 11 files changed, 175 insertions(+), 209 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
index 725d095dc..4def9ead8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/Preconditions.java
@@ -18,11 +18,14 @@
 package org.apache.inlong.manager.common.util;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -125,6 +128,41 @@ public class Preconditions {
         }
     }
 
+    public static void checkBlank(String obj, ErrorCodeEnum errorCodeEnum) {
+        if (StringUtils.isBlank(obj)) {
+            throw new BusinessException(errorCodeEnum);
+        }
+    }
+
+    public static void checkBlank(String obj, ErrorCodeEnum errorCodeEnum, String errMsg) {
+        if (StringUtils.isBlank(obj)) {
+            throw new BusinessException(errorCodeEnum, errMsg);
+        }
+    }
+
+    public static void checkNull(Object obj, ErrorCodeEnum errorCodeEnum) {
+        if (obj == null) {
+            throw new BusinessException(errorCodeEnum);
+        }
+    }
+
+    public static void checkNull(Object obj, ErrorCodeEnum errorCodeEnum, String errMsg) {
+        if (obj == null) {
+            throw new BusinessException(errorCodeEnum, errMsg);
+        }
+    }
+
+    public static void chkNotEquals(Object a, Object b, ErrorCodeEnum errorCodeEnum) {
+        if (!Objects.equals(a, b)) {
+            throw new BusinessException(errorCodeEnum);
+        }
+    }
+
+    public static void chkNotEquals(Object a, Object b, ErrorCodeEnum errorCodeEnum, String errMsg) {
+        if (!Objects.equals(a, b)) {
+            throw new BusinessException(errorCodeEnum, errMsg);
+        }
+    }
     /**
      * Whether a target string is in a string separated by the separator.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index e6f269f78..576618983 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.group;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
@@ -233,15 +232,6 @@ public class InlongGroupProcessService {
      * Delete InlongGroup logically and delete related resource in a synchronous way.
      */
     public Boolean deleteProcess(String groupId, UserInfo opInfo) {
-        // check groupId parameter
-        if (StringUtils.isBlank(groupId)) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "inlong group id in request cannot be blank");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index e3d768f1e..0eeb312f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -177,15 +177,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public String save(InlongGroupRequest request, UserInfo opInfo) {
-        // check parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY,
-                    "inlong group request cannot be empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         String groupId = request.getInlongGroupId();
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity != null) {
@@ -232,14 +223,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
 
     @Override
     public InlongGroupInfo get(String groupId, UserInfo opInfo) {
-        // check group id
-        if (StringUtils.isBlank(groupId)) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -372,14 +355,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
 
     @Override
     public List<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "group query request cannot be empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // filter records;
         List<InlongGroupEntity> filterGroupEntities = new ArrayList<>();
         for (InlongGroupEntity groupEntity : groupMapper.selectByCondition(request)) {
@@ -424,11 +399,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             LOGGER.error("inlong group not found by groupId={}", groupId);
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
-        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
-            LOGGER.error("inlong group has already updated with groupId={}, curVersion={}",
-                    groupId, request.getVersion());
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
+        chkUnmodifiableParams(entity, request);
         // check whether the current status can be modified
         doUpdateCheck(entity, request, operator);
 
@@ -446,22 +417,12 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRES_NEW)
     public String update(InlongGroupRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "group query request cannot be empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         String groupId = request.getInlongGroupId();
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
-        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
+        chkUnmodifiableParams(entity, request);
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             List<String> inCharges = Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
@@ -697,4 +658,15 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         }
         return sortConf;
     }
+
+    private void chkUnmodifiableParams(InlongGroupEntity entity, InlongGroupRequest request) {
+        // check mqType
+        Preconditions.chkNotEquals(entity.getMqType(), request.getMqType(),
+                ErrorCodeEnum.INVALID_PARAMETER, "mqType not allowed modify");
+        // check record version
+        Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        entity.getVersion(), request.getVersion()));
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 614ff8ca2..130709ce8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -76,14 +77,6 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Override
     public Integer save(DataNodeRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -118,13 +111,6 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Override
     public DataNodeInfo get(Integer id, UserInfo opInfo) {
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -133,8 +119,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
         }
-        String dataNodeType = entity.getType();
-        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
         return dataNodeOperator.getFromEntity(entity);
     }
 
@@ -172,14 +157,6 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Override
     public List<DataNodeInfo> list(DataNodePageRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -203,6 +180,8 @@ public class DataNodeServiceImpl implements DataNodeService {
             throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
                     String.format("data node record not found by id=%d", request.getId()));
         }
+        // check whether modify unmodifiable parameters
+        chkUnmodifiableParams(curEntity, request);
         // Check whether the data node name exists with the same name and type
         if (request.getName() != null) {
             if (StringUtils.isBlank(request.getName())) {
@@ -227,14 +206,6 @@ public class DataNodeServiceImpl implements DataNodeService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(DataNodeRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -245,6 +216,8 @@ public class DataNodeServiceImpl implements DataNodeService {
             throw new BusinessException(ErrorCodeEnum.RECORD_NOT_FOUND,
                     String.format("data node record not found by id=%d", request.getId()));
         }
+        // check whether modify unmodifiable parameters
+        chkUnmodifiableParams(curEntity, request);
         // Check whether the data node name exists with the same name and type
         if (request.getName() != null) {
             if (StringUtils.isBlank(request.getName())) {
@@ -278,7 +251,6 @@ public class DataNodeServiceImpl implements DataNodeService {
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
-
         request.setId(entity.getId());
         Boolean result = this.update(request, operator);
         LOGGER.info("success to update data node by key: {}", request);
@@ -298,14 +270,6 @@ public class DataNodeServiceImpl implements DataNodeService {
 
     @Override
     public Boolean delete(Integer id, UserInfo opInfo) {
-        // check id parameter
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             throw new BusinessException(ErrorCodeEnum.PERMISSION_REQUIRED);
@@ -339,6 +303,17 @@ public class DataNodeServiceImpl implements DataNodeService {
         return true;
     }
 
+    private void chkUnmodifiableParams(DataNodeEntity curEntity, DataNodeRequest request) {
+        // check type
+        Preconditions.chkNotEquals(curEntity.getType(), request.getType(),
+                ErrorCodeEnum.INVALID_PARAMETER, "type not allowed modify");
+        // check record version
+        Preconditions.chkNotEquals(curEntity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        curEntity.getVersion(), request.getVersion()));
+    }
+
     @Override
     public Boolean deleteByKey(String name, String type, String operator) {
         DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(name, type);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index fa1afaa0a..621afda19 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -68,7 +68,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -152,10 +151,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     public Integer save(SinkRequest request, UserInfo opInfo) {
         // check request parameter
         checkSinkRequestParams(request);
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
@@ -227,14 +222,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public StreamSink get(Integer id, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
-        // check sink id
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
-        }
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
@@ -327,18 +314,10 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     @Override
     public List<? extends StreamSink> listByCondition(SinkPageRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "group query request cannot be empty");
-        }
         // check sink id
         if (StringUtils.isBlank(request.getInlongGroupId())) {
             throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
         }
-        // check opInfo
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // query result
         List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByCondition(request);
         Map<String, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -372,24 +351,31 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(SinkRequest request, String operator) {
         LOGGER.info("begin to update sink by id: {}", request);
-        this.checkParams(request);
-        Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
-
-        // Check if it can be modified
-        String groupId = request.getInlongGroupId();
-        String streamId = request.getInlongStreamId();
-        String sinkName = request.getSinkName();
-        groupCheckService.checkGroupStatus(groupId, operator);
-
+        if (request == null) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "inlong sink request is empty");
+        }
+        if (request.getId() == null) {
+            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
+        }
+        StreamSinkEntity curEntity = sinkMapper.selectByPrimaryKey(request.getId());
+        if (curEntity == null) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
+        }
+        chkUnmodifiableParams(curEntity, request);
+        groupCheckService.checkGroupStatus(request.getInlongGroupId(), operator);
         // Check whether the stream exist or not
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+                request.getInlongGroupId(), request.getInlongStreamId());
         Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
 
         // Check whether the sink name exists with the same groupId and streamId
-        StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(groupId, streamId, sinkName);
+        StreamSinkEntity existEntity = sinkMapper.selectByUniqueKey(
+                request.getInlongGroupId(), request.getInlongStreamId(), request.getSinkName());
         if (existEntity != null && !existEntity.getId().equals(request.getId())) {
             String errMsg = "sink name=%s already exists with the groupId=%s streamId=%s";
-            throw new BusinessException(String.format(errMsg, sinkName, groupId, streamId));
+            throw new BusinessException(String.format(errMsg,
+                    request.getSinkName(), request.getInlongGroupId(), request.getInlongStreamId()));
         }
 
         SinkStatus nextStatus = null;
@@ -404,7 +390,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
         // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
         if (streamSuccess && request.getStartProcess()) {
-            this.startProcessForSink(groupId, streamId, operator);
+            this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
         }
 
         LOGGER.info("success to update sink by id: {}", request);
@@ -417,32 +403,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         if (request.getId() == null) {
             throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
         }
-        // check opInfo
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         StreamSinkEntity curEntity = sinkMapper.selectByPrimaryKey(request.getId());
         if (curEntity == null) {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
         }
-        if (!Objects.equals(curEntity.getVersion(), request.getVersion())) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("sink has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                            curEntity.getInlongGroupId(), curEntity.getInlongStreamId(), curEntity.getSinkName(),
-                            curEntity.getVersion()));
-        }
-        if (StringUtils.isNotBlank(request.getInlongGroupId())
-                && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "InlongGroupId not allowed modify");
-        }
-        if (StringUtils.isNotBlank(request.getInlongStreamId())
-                && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "InlongStreamId not allowed modify");
-        }
-        request.setInlongGroupId(curEntity.getInlongGroupId());
-        request.setInlongStreamId(curEntity.getInlongStreamId());
+        chkUnmodifiableParams(curEntity, request);
         // check group record
         InlongGroupEntity curGroupEntity = groupMapper.selectByGroupId(curEntity.getInlongGroupId());
         if (curGroupEntity == null) {
@@ -510,7 +475,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             LOGGER.error(errMsg);
             throw new BusinessException(errMsg);
         }
-
         request.setId(entity.getId());
         Boolean result = this.update(request, operator);
         LOGGER.info("success to update sink by key: {}", request);
@@ -552,13 +516,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean delete(Integer id, Boolean startProcess, UserInfo opInfo) {
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        // check opInfo
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // check stream sink record
         StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
         if (sinkEntity == null) {
@@ -729,9 +686,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
     private void checkSinkRequestParams(SinkRequest request) {
         // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
         // check group id
         String groupId = request.getInlongGroupId();
         if (StringUtils.isBlank(groupId)) {
@@ -787,4 +741,27 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         streamProcessOperation.deleteProcess(groupId, streamId, operator, false);
         LOGGER.debug("success to start the delete-stream-process for groupId={} streamId={}", groupId, streamId);
     }
+
+    private void chkUnmodifiableParams(StreamSinkEntity curEntity, SinkRequest request) {
+        // check type
+        Preconditions.chkNotEquals(curEntity.getSinkType(), request.getSinkType(),
+                ErrorCodeEnum.INVALID_PARAMETER, "sinkType not allowed modify");
+        // check record version
+        Preconditions.chkNotEquals(curEntity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        curEntity.getVersion(), request.getVersion()));
+        if (StringUtils.isNotBlank(request.getInlongGroupId())
+                && !curEntity.getInlongGroupId().equals(request.getInlongGroupId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongGroupId not allowed modify");
+        }
+        if (StringUtils.isNotBlank(request.getInlongStreamId())
+                && !curEntity.getInlongStreamId().equals(request.getInlongStreamId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "InlongStreamId not allowed modify");
+        }
+        request.setInlongGroupId(curEntity.getInlongGroupId());
+        request.setInlongStreamId(curEntity.getInlongStreamId());
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1bd605647..0ae06efe7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -141,14 +141,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Override
     public Integer save(InlongStreamRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "inlong stream info is empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -233,18 +225,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Override
     public InlongStreamInfo get(String groupId, String streamId, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
-        // check group id
-        if (StringUtils.isBlank(groupId)) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
-        // check stream id
-        if (StringUtils.isBlank(streamId)) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
@@ -341,14 +321,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
     @Override
     public List<InlongStreamBriefInfo> listBrief(InlongStreamPageRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "group query request cannot be empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         request.setCurrentUser(LoginUserUtils.getLoginUser().getName());
         request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.ADMIN));
         return CommonBeanUtils.copyListProperties(streamMapper.selectByCondition(request), InlongStreamBriefInfo::new);
@@ -436,18 +408,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean update(InlongStreamRequest request, UserInfo opInfo) {
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "inlong stream request is empty");
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         InlongGroupEntity entity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
         }
+        // check record version
+        Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        entity.getVersion(), request.getVersion()));
         // only the person in charges can query
         if (!opInfo.getRoles().contains(UserTypeEnum.ADMIN.name())) {
             List<String> inCharges = Arrays.asList(entity.getInCharges().split(InlongConstants.COMMA));
@@ -581,18 +550,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean delete(String groupId, String streamId, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
-        // check group id
-        if (StringUtils.isBlank(groupId)) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
-        // check stream id
-        if (StringUtils.isBlank(streamId)) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
-        }
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
         if (groupEntity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
index 9fd214166..bd32fbda6 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -58,12 +60,16 @@ public class OpenDataNodeController {
     @ApiOperation(value = "Get node by id")
     @ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
     public Response<DataNodeInfo> get(@PathVariable Integer id) {
+        Preconditions.checkNull(id, ErrorCodeEnum.ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(dataNodeService.get(id, LoginUserUtils.getLoginUser()));
     }
 
     @PostMapping(value = "/node/list")
     @ApiOperation(value = "List data node")
     public Response<List<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(dataNodeService.list(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -71,6 +77,8 @@ public class OpenDataNodeController {
     @ApiOperation(value = "Save node")
     @OperationLog(operation = OperationType.CREATE)
     public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody DataNodeRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(dataNodeService.save(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -78,6 +86,8 @@ public class OpenDataNodeController {
     @ApiOperation(value = "Update data node")
     @OperationLog(operation = OperationType.UPDATE)
     public Response<Boolean> update(@Validated(UpdateByIdValidation.class) @RequestBody DataNodeRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(dataNodeService.update(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -86,6 +96,8 @@ public class OpenDataNodeController {
     @OperationLog(operation = OperationType.DELETE)
     @ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
     public Response<Boolean> delete(@PathVariable Integer id) {
+        Preconditions.checkNull(id, ErrorCodeEnum.ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(dataNodeService.delete(id, LoginUserUtils.getLoginUser()));
     }
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index 66e39f0e8..c279997a5 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.Response;
@@ -63,12 +65,17 @@ public class OpenInLongGroupController {
     @ApiOperation(value = "Get InLong group information by groupId")
     @ApiImplicitParam(name = "groupId", value = "InLong Group ID", dataTypeClass = String.class, required = true)
     public Response<InlongGroupInfo> get(@PathVariable String groupId) {
+        Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(groupService.get(groupId, LoginUserUtils.getLoginUser()));
     }
 
     @PostMapping(value = "/group/list")
     @ApiOperation(value = "List inlong groups by paginating")
     public Response<List<InlongGroupBriefInfo>> listBrief(@RequestBody InlongGroupPageRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+                "group query request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(groupService.listBrief(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -76,6 +83,9 @@ public class OpenInLongGroupController {
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save inlong group")
     public Response<String> save(@Validated(SaveValidation.class) @RequestBody InlongGroupRequest groupRequest) {
+        Preconditions.checkNull(groupRequest, ErrorCodeEnum.REQUEST_IS_EMPTY,
+                "inlong group request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(groupService.save(groupRequest, LoginUserUtils.getLoginUser()));
     }
 
@@ -83,6 +93,9 @@ public class OpenInLongGroupController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update inlong group")
     public Response<String> update(@Validated(UpdateValidation.class) @RequestBody InlongGroupRequest groupRequest) {
+        Preconditions.checkNull(groupRequest, ErrorCodeEnum.REQUEST_IS_EMPTY,
+                "group query request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(groupService.update(groupRequest, LoginUserUtils.getLoginUser()));
     }
 
@@ -91,6 +104,9 @@ public class OpenInLongGroupController {
     @OperationLog(operation = OperationType.DELETE)
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", dataTypeClass = String.class, required = true)
     public Response<Boolean> delete(@PathVariable String groupId) {
+        Preconditions.checkBlank(groupId, ErrorCodeEnum.INVALID_PARAMETER,
+                "inlong group id in request cannot be blank");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(groupProcessOperation.deleteProcess(groupId, LoginUserUtils.getLoginUser()));
     }
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 031e54ca9..92786b1b2 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -60,12 +62,18 @@ public class OpenInLongStreamController {
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
     })
     public Response<InlongStreamInfo> get(@RequestParam String groupId, @RequestParam String streamId) {
+        Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Preconditions.checkBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.get(groupId, streamId, LoginUserUtils.getLoginUser()));
     }
 
     @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
     @ApiOperation(value = "List inlong stream briefs by paginating")
     public Response<List<InlongStreamBriefInfo>> listByCondition(@RequestBody InlongStreamPageRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+                "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.listBrief(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -73,6 +81,9 @@ public class OpenInLongStreamController {
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save inlong stream")
     public Response<Integer> save(@RequestBody InlongStreamRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+                "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.save(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -80,6 +91,9 @@ public class OpenInLongStreamController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update inlong stream")
     public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody InlongStreamRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER,
+                "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.update(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -91,6 +105,9 @@ public class OpenInLongStreamController {
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true)
     })
     public Response<Boolean> delete(@RequestParam String groupId, @RequestParam String streamId) {
+        Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Preconditions.checkBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamService.delete(groupId, streamId, LoginUserUtils.getLoginUser()));
     }
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index c6048694c..b8561668d 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
@@ -57,12 +59,16 @@ public class OpenStreamSinkController {
     @ApiOperation(value = "Get stream sink")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
     public Response<StreamSink> get(@PathVariable Integer id) {
+        Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.get(id, LoginUserUtils.getLoginUser()));
     }
 
     @RequestMapping(value = "/sink/list", method = RequestMethod.GET)
     @ApiOperation(value = "List stream sinks by paginating")
     public Response<List<? extends StreamSink>> listByCondition(SinkPageRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.listByCondition(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -70,6 +76,8 @@ public class OpenStreamSinkController {
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save stream sink")
     public Response<Integer> save(@Validated @RequestBody SinkRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.save(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -77,6 +85,8 @@ public class OpenStreamSinkController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream sink")
     public Response<Boolean> update(@Validated(UpdateByIdValidation.class) @RequestBody SinkRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.update(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -89,6 +99,8 @@ public class OpenStreamSinkController {
     })
     public Response<Boolean> delete(@PathVariable Integer id,
             @RequestParam(required = false, defaultValue = "false") boolean startProcess) {
+        Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "sink id is empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sinkService.delete(id, startProcess, LoginUserUtils.getLoginUser()));
     }
 }
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
index 1214079b6..b8ea2f44b 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
@@ -118,7 +118,7 @@ class DataNodeControllerTest extends WebBaseTest {
         // insert the test data
         DataNodeEntity nodeEntity = new DataNodeEntity();
         nodeEntity.setName("test");
-        nodeEntity.setType("MYSQL");
+        nodeEntity.setType(DataNodeType.HIVE);
         nodeEntity.setIsDeleted(0);
         nodeEntity.setModifier("test");
         nodeEntity.setCreator("test");