You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/01/29 01:52:55 UTC

[inlong] branch master updated: [INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e2718778 [INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)
5e2718778 is described below

commit 5e27187788b382e7ea8e1843be88980557148e42
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Jan 29 09:52:50 2023 +0800

    [INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)
---
 .../manager/common/validation/SaveValidation.java  |   2 +-
 .../pojo/transform/DeleteTransformRequest.java     |  15 ++-
 .../manager/pojo/transform/TransformRequest.java   |  20 ++--
 .../service/source/StreamSourceServiceImpl.java    |  62 ++--------
 .../transform/StreamTransformServiceImpl.java      | 129 ++++++++-------------
 .../openapi/OpenStreamSourceController.java        |  12 ++
 .../openapi/OpenStreamTransformController.java     |  10 ++
 7 files changed, 101 insertions(+), 149 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
index 832af0a0c..d5922e352 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
@@ -22,7 +22,7 @@ import javax.validation.groups.Default;
 /**
  * Used for validate add request fields group
  *
- * @see UpdateValidation
+ * @see SaveValidation
  */
 public interface SaveValidation extends Default {
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
index 564dffcd0..20e176fdc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
@@ -20,8 +20,9 @@ package org.apache.inlong.manager.pojo.transform;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-
+import org.hibernate.validator.constraints.Length;
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
 
 /**
  * Delete request of transform
@@ -30,16 +31,22 @@ import javax.validation.constraints.NotBlank;
 @ApiModel("Delete request of stream transform")
 public class DeleteTransformRequest {
 
-    @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @NotBlank(message = "inlongGroupId cannot be blank")
+    @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
-    @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @NotBlank(message = "inlongStreamId cannot be blank")
+    @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
-    @NotBlank(message = "transformName cannot be blank")
     @ApiModelProperty("Transform name, unique in one stream")
+    @NotBlank(message = "transformName cannot be blank")
+    @Length(min = 1, max = 100, message = "transformName length must be between 1 and 100")
+    @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "transformName only supports lowercase letters, numbers, '-', or '_'")
     private String transformName;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
index 5895d5d44..a57679a54 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.pojo.transform;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.hibernate.validator.constraints.Length;
@@ -36,49 +37,50 @@ import java.util.List;
 @ApiModel("Stream transform request")
 public class TransformRequest {
 
-    @NotNull(groups = UpdateValidation.class)
     @ApiModelProperty(value = "Primary key")
+    @NotNull(groups = UpdateValidation.class, message = "id cannot be null")
     private Integer id;
 
-    @NotBlank(message = "inlongGroupId cannot be blank")
     @ApiModelProperty("Inlong group id")
+    @NotBlank(groups = SaveValidation.class, message = "inlongGroupId cannot be blank")
     @Length(min = 4, max = 100, message = "length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
     private String inlongGroupId;
 
-    @NotBlank(message = "inlongStreamId cannot be blank")
     @ApiModelProperty("Inlong stream id")
+    @NotBlank(groups = SaveValidation.class, message = "inlongStreamId cannot be blank")
     @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
     private String inlongStreamId;
 
-    @NotBlank(message = "transformName cannot be blank")
+    @ApiModelProperty("Transform name, unique in one stream")
+    @NotBlank(groups = SaveValidation.class, message = "transformName cannot be blank")
     @Length(min = 1, max = 100, message = "transformName length must be between 1 and 100")
     @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "transformName only supports lowercase letters, numbers, '-', or '_'")
-    @ApiModelProperty("Transform name, unique in one stream")
     private String transformName;
 
-    @NotBlank(message = "transformType cannot be blank")
     @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+    @NotBlank(groups = SaveValidation.class, message = "transformType cannot be blank")
     @Length(max = 15, message = "length must be less than or equal to 15")
     private String transformType;
 
-    @NotBlank(message = "preNodeNames cannot be blank")
     @ApiModelProperty("Pre node names of transform in this stream, join by ','")
+    @NotBlank(message = "preNodeNames cannot be blank")
     @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String preNodeNames;
 
-    @NotBlank(message = "postNodeNames cannot be blank")
     @ApiModelProperty("Post node names of transform in this stream, join by ','")
+    @NotBlank(message = "postNodeNames cannot be blank")
     @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String postNodeNames;
 
-    @NotBlank(message = "transformDefinition cannot be blank")
     @ApiModelProperty("Transform definition in json type")
+    @NotBlank(message = "transformDefinition cannot be blank")
     @Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
     private String transformDefinition;
 
     @ApiModelProperty("Version of transform")
+    @NotNull(groups = UpdateValidation.class, message = "version cannot be null")
     private Integer version;
 
     @ApiModelProperty(value = "Field list")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8f991bb47..0e7c3c355 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -65,7 +65,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -124,14 +123,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Integer save(SourceRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // Check if it can be added
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
@@ -195,14 +186,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
     @Override
     public StreamSource get(Integer id, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
-        // check source id
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
-        }
         StreamSourceEntity entity = sourceMapper.selectById(id);
         if (entity == null) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
@@ -298,17 +281,9 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
     @Override
     public PageResult<? extends StreamSource> listByCondition(SourcePageRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
         if (StringUtils.isBlank(request.getInlongGroupId())) {
             throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
         }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         OrderFieldEnum.checkOrderField(request);
         OrderTypeEnum.checkOrderType(request);
@@ -361,7 +336,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     public Boolean update(SourceRequest request, String operator) {
         LOGGER.info("begin to update source info: {}", request);
         // check request parameter
-        checkRequestParams(request);
+        chkUnmodifiableParams(request);
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
@@ -382,12 +357,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
     public Boolean update(SourceRequest request, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // check request parameter
-        checkRequestParams(request);
+        chkUnmodifiableParams(request);
         // Check if it can be update
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
@@ -466,13 +437,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
     public Boolean delete(Integer id, UserInfo opInfo) {
-        if (id == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        // check opInfo
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         if (entity == null) {
             return true;
@@ -634,11 +598,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkNotNull(sourceName, ErrorCodeEnum.SOURCE_NAME_IS_NULL.getMessage());
     }
 
-    private void checkRequestParams(SourceRequest request) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
+    private void chkUnmodifiableParams(SourceRequest request) {
         // check record exists
         StreamSourceEntity entity = sourceMapper.selectById(request.getId());
         if (entity == null) {
@@ -646,17 +606,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
                     String.format("not found source record by id=%d", request.getId()));
         }
         // check whether modify sourceType
-        if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
-            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
-                    "sourceType not allowed modify");
-        }
+        Preconditions.chkNotEquals(entity.getSourceType(), request.getSourceType(),
+                ErrorCodeEnum.INVALID_PARAMETER, "sourceType not allowed modify");
         // check record version
-        if (!Objects.equals(entity.getVersion(), request.getVersion())) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                            request.getInlongGroupId(), request.getInlongStreamId(), request.getSourceName(),
-                            request.getVersion()));
-        }
+        Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        entity.getVersion(), request.getVersion()));
         // check whether modify groupId
         if (StringUtils.isNotBlank(request.getInlongGroupId())
                 && !entity.getInlongGroupId().equals(request.getInlongGroupId())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 89fb0a389..2e27e96a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -100,12 +100,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Integer save(TransformRequest request, UserInfo opInfo) {
-        // check request and parameters
-        this.checkRequestParams(request);
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // Check if it can be added
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
@@ -122,7 +116,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         // check inlong group status
         GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
         if (GroupStatus.notAllowedUpdate(status)) {
-            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+                    String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
         }
         // Check if the record to be added exists
         List<StreamTransformEntity> transformEntities =
@@ -145,7 +140,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
 
     @Override
     public List<TransformResponse> listTransform(String groupId, String streamId) {
-        LOGGER.info("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
+        LOGGER.debug("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         List<StreamTransformEntity> entityList = transformMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
@@ -177,14 +172,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
 
     @Override
     public List<TransformResponse> listTransform(String groupId, String streamId, UserInfo opInfo) {
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
-        // check group id
-        if (StringUtils.isBlank(groupId)) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
         // Check if it can be added
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
         if (groupEntity == null) {
@@ -231,28 +218,21 @@ public class StreamTransformServiceImpl implements StreamTransformService {
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Boolean update(TransformRequest request, String operator) {
         LOGGER.info("begin to update transform info: {}", request);
-        this.checkParams(request);
+        // check request and parameters
+        this.chkUnmodifiableParams(request);
         // Check whether the transform can be modified
         String groupId = request.getInlongGroupId();
         groupCheckService.checkGroupStatus(groupId, operator);
         Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
-        StreamTransformEntity exist = transformMapper.selectById(request.getId());
-        if (exist == null) {
-            LOGGER.error("transform not found by id={}", request.getId());
-            throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
-        }
-        String msg = String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                request.getInlongGroupId(), request.getInlongStreamId(),
-                request.getTransformName(), request.getVersion());
-        if (!exist.getVersion().equals(request.getVersion())) {
-            LOGGER.error(msg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
         StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(request,
                 StreamTransformEntity::new);
         transformEntity.setModifier(operator);
         int rowCount = transformMapper.updateByIdSelective(transformEntity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            String msg =
+                    String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
+                            request.getInlongGroupId(), request.getInlongStreamId(),
+                            request.getTransformName(), request.getVersion());
             LOGGER.error(msg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
@@ -264,15 +244,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Boolean update(TransformRequest request, UserInfo opInfo) {
         // check request and parameters
-        this.checkRequestParams(request);
-        // check record id
-        if (request.getId() == null) {
-            throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
+        this.chkUnmodifiableParams(request);
         // Check if it can be added
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
@@ -289,17 +261,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         // check inlong group status
         GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
         if (GroupStatus.notAllowedUpdate(status)) {
-            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
-        }
-        StreamTransformEntity exist = transformMapper.selectById(request.getId());
-        if (exist == null) {
-            throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
-        }
-        if (!exist.getVersion().equals(request.getVersion())) {
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
-                    String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
-                            request.getInlongGroupId(), request.getInlongStreamId(),
-                            request.getTransformName(), request.getVersion()));
+            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+                    String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
         }
         // update record
         StreamTransformEntity transformEntity =
@@ -352,22 +315,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Boolean delete(DeleteTransformRequest request, UserInfo opInfo) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
-        }
-        // check group id
-        if (StringUtils.isBlank(request.getInlongGroupId())) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        }
-        // check stream id
-        if (StringUtils.isBlank(request.getInlongStreamId())) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
-        }
-        // check operator info
-        if (opInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
-        }
         // Check if it can be added
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
         if (groupEntity == null) {
@@ -384,7 +331,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         // check inlong group status
         GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
         if (GroupStatus.notAllowedUpdate(status)) {
-            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+                    String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
         }
         // query records
         List<StreamTransformEntity> entityList =
@@ -421,27 +369,44 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         Preconditions.checkNotNull(transformName, ErrorCodeEnum.TRANSFORM_NAME_IS_NULL.getMessage());
     }
 
-    private void checkRequestParams(TransformRequest request) {
-        // check request parameter
-        if (request == null) {
-            throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
+    private void chkUnmodifiableParams(TransformRequest request) {
+        StreamTransformEntity entity = transformMapper.selectById(request.getId());
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
         }
+        // check record version
+        Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+                ErrorCodeEnum.CONFIG_EXPIRED,
+                String.format("record has expired with record version=%d, request version=%d",
+                        entity.getVersion(), request.getVersion()));
         // check group id
-        if (StringUtils.isBlank(request.getInlongGroupId())) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        if (StringUtils.isNotBlank(request.getInlongGroupId())
+                && !entity.getInlongGroupId().equals(request.getInlongGroupId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "inlongGroupId not allowed modify");
         }
         // check stream id
-        if (StringUtils.isBlank(request.getInlongStreamId())) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+        if (StringUtils.isNotBlank(request.getInlongStreamId())
+                && !entity.getInlongStreamId().equals(request.getInlongStreamId())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "inlongStreamId not allowed modify");
         }
         // check transform type
-        if (StringUtils.isBlank(request.getTransformType())) {
-            throw new BusinessException(ErrorCodeEnum.TRANSFORM_TYPE_IS_NULL);
+        if (StringUtils.isNotBlank(request.getTransformType())
+                && !entity.getTransformType().equals(request.getTransformType())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "transformType not allowed modify");
         }
         // check transform name
-        if (StringUtils.isBlank(request.getTransformName())) {
-            throw new BusinessException(ErrorCodeEnum.TRANSFORM_NAME_IS_NULL);
-        }
+        if (StringUtils.isNotBlank(request.getTransformName())
+                && !entity.getTransformName().equals(request.getTransformName())) {
+            throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+                    "transformName not allowed modify");
+        }
+        request.setInlongGroupId(entity.getInlongGroupId());
+        request.setInlongStreamId(entity.getInlongStreamId());
+        request.setTransformType(entity.getTransformType());
+        request.setTransformName(entity.getTransformName());
     }
 
     private void updateFieldOpt(StreamTransformEntity entity, List<StreamField> fieldList) {
@@ -455,11 +420,11 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         // Then batch save the source fields
         this.saveFieldOpt(entity, fieldList);
 
-        LOGGER.info("success to update transform field");
+        LOGGER.debug("success to update transform field");
     }
 
     private void saveFieldOpt(StreamTransformEntity entity, List<StreamField> fieldList) {
-        LOGGER.info("begin to save transform field={}", fieldList);
+        LOGGER.debug("begin to save transform field={}", fieldList);
         if (CollectionUtils.isEmpty(fieldList)) {
             return;
         }
@@ -488,6 +453,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         }
 
         transformFieldMapper.insertAll(entityList);
-        LOGGER.info("success to save transform fields");
+        LOGGER.debug("success to save transform fields");
     }
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 292a543d7..a53d9aff6 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -55,12 +57,16 @@ public class OpenStreamSourceController {
     @ApiOperation(value = "Get stream source")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
     public Response<StreamSource> get(@PathVariable Integer id) {
+        Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sourceService.get(id, LoginUserUtils.getLoginUser()));
     }
 
     @RequestMapping(value = "/source/list", method = RequestMethod.GET)
     @ApiOperation(value = "List stream sources by paginating")
     public Response<PageResult<? extends StreamSource>> listByCondition(SourcePageRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sourceService.listByCondition(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -68,6 +74,8 @@ public class OpenStreamSourceController {
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save stream source")
     public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody SourceRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -75,6 +83,8 @@ public class OpenStreamSourceController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream source")
     public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody SourceRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sourceService.update(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -83,6 +93,8 @@ public class OpenStreamSourceController {
     @ApiOperation(value = "Delete stream source")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
     public Response<Boolean> delete(@PathVariable Integer id) {
+        Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(sourceService.delete(id, LoginUserUtils.getLoginUser()));
     }
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index 205c575dc..b9efd2a07 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.manager.web.controller.openapi;
 
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
@@ -54,6 +56,8 @@ public class OpenStreamTransformController {
     @ApiOperation(value = "Get stream transform list")
     public Response<List<TransformResponse>> list(@RequestParam("inlongGroupId") String groupId,
             @RequestParam("inlongStreamId") String streamId) {
+        Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamTransformService.listTransform(
                 groupId, streamId, LoginUserUtils.getLoginUser()));
     }
@@ -62,6 +66,8 @@ public class OpenStreamTransformController {
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save stream transform")
     public Response<Integer> save(@Validated @RequestBody TransformRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(
                 streamTransformService.save(request, LoginUserUtils.getLoginUser()));
     }
@@ -70,6 +76,8 @@ public class OpenStreamTransformController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update stream transform")
     public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody TransformRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamTransformService.update(request, LoginUserUtils.getLoginUser()));
     }
 
@@ -77,6 +85,8 @@ public class OpenStreamTransformController {
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Delete stream transform")
     public Response<Boolean> delete(@Validated DeleteTransformRequest request) {
+        Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+        Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
         return Response.success(streamTransformService.delete(request, LoginUserUtils.getLoginUser()));
     }
 }