You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2023/01/29 01:52:55 UTC
[inlong] branch master updated: [INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5e2718778 [INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)
5e2718778 is described below
commit 5e27187788b382e7ea8e1843be88980557148e42
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Jan 29 09:52:50 2023 +0800
[INLONG-7261][Manager] Optimize OpenStreamTransformController implementation (#7262)
---
.../manager/common/validation/SaveValidation.java | 2 +-
.../pojo/transform/DeleteTransformRequest.java | 15 ++-
.../manager/pojo/transform/TransformRequest.java | 20 ++--
.../service/source/StreamSourceServiceImpl.java | 62 ++--------
.../transform/StreamTransformServiceImpl.java | 129 ++++++++-------------
.../openapi/OpenStreamSourceController.java | 12 ++
.../openapi/OpenStreamTransformController.java | 10 ++
7 files changed, 101 insertions(+), 149 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
index 832af0a0c..d5922e352 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/validation/SaveValidation.java
@@ -22,7 +22,7 @@ import javax.validation.groups.Default;
/**
* Used for validate add request fields group
*
- * @see UpdateValidation
+ * @see SaveValidation
*/
public interface SaveValidation extends Default {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
index 564dffcd0..20e176fdc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/DeleteTransformRequest.java
@@ -20,8 +20,9 @@ package org.apache.inlong.manager.pojo.transform;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-
+import org.hibernate.validator.constraints.Length;
import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
/**
* Delete request of transform
@@ -30,16 +31,22 @@ import javax.validation.constraints.NotBlank;
@ApiModel("Delete request of stream transform")
public class DeleteTransformRequest {
- @NotBlank(message = "inlongGroupId cannot be blank")
@ApiModelProperty("Inlong group id")
+ @NotBlank(message = "inlongGroupId cannot be blank")
+ @Length(min = 4, max = 100, message = "length must be between 4 and 100")
+ @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String inlongGroupId;
- @NotBlank(message = "inlongStreamId cannot be blank")
@ApiModelProperty("Inlong stream id")
+ @NotBlank(message = "inlongStreamId cannot be blank")
+ @Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
+ @Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
private String inlongStreamId;
- @NotBlank(message = "transformName cannot be blank")
@ApiModelProperty("Transform name, unique in one stream")
+ @NotBlank(message = "transformName cannot be blank")
+ @Length(min = 1, max = 100, message = "transformName length must be between 1 and 100")
+ @Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "transformName only supports lowercase letters, numbers, '-', or '_'")
private String transformName;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
index 5895d5d44..a57679a54 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformRequest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.pojo.transform;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.hibernate.validator.constraints.Length;
@@ -36,49 +37,50 @@ import java.util.List;
@ApiModel("Stream transform request")
public class TransformRequest {
- @NotNull(groups = UpdateValidation.class)
@ApiModelProperty(value = "Primary key")
+ @NotNull(groups = UpdateValidation.class, message = "id cannot be null")
private Integer id;
- @NotBlank(message = "inlongGroupId cannot be blank")
@ApiModelProperty("Inlong group id")
+ @NotBlank(groups = SaveValidation.class, message = "inlongGroupId cannot be blank")
@Length(min = 4, max = 100, message = "length must be between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "only supports lowercase letters, numbers, '-', or '_'")
private String inlongGroupId;
- @NotBlank(message = "inlongStreamId cannot be blank")
@ApiModelProperty("Inlong stream id")
+ @NotBlank(groups = SaveValidation.class, message = "inlongStreamId cannot be blank")
@Length(min = 4, max = 100, message = "inlongStreamId length must be between 4 and 100")
@Pattern(regexp = "^[a-z0-9_-]{4,100}$", message = "inlongStreamId only supports lowercase letters, numbers, '-', or '_'")
private String inlongStreamId;
- @NotBlank(message = "transformName cannot be blank")
+ @ApiModelProperty("Transform name, unique in one stream")
+ @NotBlank(groups = SaveValidation.class, message = "transformName cannot be blank")
@Length(min = 1, max = 100, message = "transformName length must be between 1 and 100")
@Pattern(regexp = "^[a-z0-9_-]{1,100}$", message = "transformName only supports lowercase letters, numbers, '-', or '_'")
- @ApiModelProperty("Transform name, unique in one stream")
private String transformName;
- @NotBlank(message = "transformType cannot be blank")
@ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+ @NotBlank(groups = SaveValidation.class, message = "transformType cannot be blank")
@Length(max = 15, message = "length must be less than or equal to 15")
private String transformType;
- @NotBlank(message = "preNodeNames cannot be blank")
@ApiModelProperty("Pre node names of transform in this stream, join by ','")
+ @NotBlank(message = "preNodeNames cannot be blank")
@Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
private String preNodeNames;
- @NotBlank(message = "postNodeNames cannot be blank")
@ApiModelProperty("Post node names of transform in this stream, join by ','")
+ @NotBlank(message = "postNodeNames cannot be blank")
@Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
private String postNodeNames;
- @NotBlank(message = "transformDefinition cannot be blank")
@ApiModelProperty("Transform definition in json type")
+ @NotBlank(message = "transformDefinition cannot be blank")
@Length(min = 1, max = 1638400, message = "length must be between 1 and 1638400")
private String transformDefinition;
@ApiModelProperty("Version of transform")
+ @NotNull(groups = UpdateValidation.class, message = "version cannot be null")
private Integer version;
@ApiModelProperty(value = "Field list")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8f991bb47..0e7c3c355 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -65,7 +65,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -124,14 +123,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Integer save(SourceRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// Check if it can be added
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
@@ -195,14 +186,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public StreamSource get(Integer id, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
- // check source id
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
- }
StreamSourceEntity entity = sourceMapper.selectById(id);
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND,
@@ -298,17 +281,9 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public PageResult<? extends StreamSource> listByCondition(SourcePageRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
if (StringUtils.isBlank(request.getInlongGroupId())) {
throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
}
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
PageHelper.startPage(request.getPageNum(), request.getPageSize());
OrderFieldEnum.checkOrderField(request);
OrderTypeEnum.checkOrderType(request);
@@ -361,7 +336,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
public Boolean update(SourceRequest request, String operator) {
LOGGER.info("begin to update source info: {}", request);
// check request parameter
- checkRequestParams(request);
+ chkUnmodifiableParams(request);
// Check if it can be modified
String groupId = request.getInlongGroupId();
@@ -382,12 +357,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Boolean update(SourceRequest request, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// check request parameter
- checkRequestParams(request);
+ chkUnmodifiableParams(request);
// Check if it can be update
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
@@ -466,13 +437,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Boolean delete(Integer id, UserInfo opInfo) {
- if (id == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- // check opInfo
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
if (entity == null) {
return true;
@@ -634,11 +598,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkNotNull(sourceName, ErrorCodeEnum.SOURCE_NAME_IS_NULL.getMessage());
}
- private void checkRequestParams(SourceRequest request) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
+ private void chkUnmodifiableParams(SourceRequest request) {
// check record exists
StreamSourceEntity entity = sourceMapper.selectById(request.getId());
if (entity == null) {
@@ -646,17 +606,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
String.format("not found source record by id=%d", request.getId()));
}
// check whether modify sourceType
- if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
- throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
- "sourceType not allowed modify");
- }
+ Preconditions.chkNotEquals(entity.getSourceType(), request.getSourceType(),
+ ErrorCodeEnum.INVALID_PARAMETER, "sourceType not allowed modify");
// check record version
- if (!Objects.equals(entity.getVersion(), request.getVersion())) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("source has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
- request.getInlongGroupId(), request.getInlongStreamId(), request.getSourceName(),
- request.getVersion()));
- }
+ Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ entity.getVersion(), request.getVersion()));
// check whether modify groupId
if (StringUtils.isNotBlank(request.getInlongGroupId())
&& !entity.getInlongGroupId().equals(request.getInlongGroupId())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 89fb0a389..2e27e96a8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -100,12 +100,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Integer save(TransformRequest request, UserInfo opInfo) {
- // check request and parameters
- this.checkRequestParams(request);
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// Check if it can be added
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
@@ -122,7 +116,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
// check inlong group status
GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
if (GroupStatus.notAllowedUpdate(status)) {
- throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+ throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+ String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}
// Check if the record to be added exists
List<StreamTransformEntity> transformEntities =
@@ -145,7 +140,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Override
public List<TransformResponse> listTransform(String groupId, String streamId) {
- LOGGER.info("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
+ LOGGER.debug("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<StreamTransformEntity> entityList = transformMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
@@ -177,14 +172,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Override
public List<TransformResponse> listTransform(String groupId, String streamId, UserInfo opInfo) {
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
- // check group id
- if (StringUtils.isBlank(groupId)) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
// Check if it can be added
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
if (groupEntity == null) {
@@ -231,28 +218,21 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Boolean update(TransformRequest request, String operator) {
LOGGER.info("begin to update transform info: {}", request);
- this.checkParams(request);
+ // check request and parameters
+ this.chkUnmodifiableParams(request);
// Check whether the transform can be modified
String groupId = request.getInlongGroupId();
groupCheckService.checkGroupStatus(groupId, operator);
Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
- StreamTransformEntity exist = transformMapper.selectById(request.getId());
- if (exist == null) {
- LOGGER.error("transform not found by id={}", request.getId());
- throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
- }
- String msg = String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
- request.getInlongGroupId(), request.getInlongStreamId(),
- request.getTransformName(), request.getVersion());
- if (!exist.getVersion().equals(request.getVersion())) {
- LOGGER.error(msg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(request,
StreamTransformEntity::new);
transformEntity.setModifier(operator);
int rowCount = transformMapper.updateByIdSelective(transformEntity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ String msg =
+ String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
+ request.getInlongGroupId(), request.getInlongStreamId(),
+ request.getTransformName(), request.getVersion());
LOGGER.error(msg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
@@ -264,15 +244,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Boolean update(TransformRequest request, UserInfo opInfo) {
// check request and parameters
- this.checkRequestParams(request);
- // check record id
- if (request.getId() == null) {
- throw new BusinessException(ErrorCodeEnum.ID_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
+ this.chkUnmodifiableParams(request);
// Check if it can be added
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
@@ -289,17 +261,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
// check inlong group status
GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
if (GroupStatus.notAllowedUpdate(status)) {
- throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
- }
- StreamTransformEntity exist = transformMapper.selectById(request.getId());
- if (exist == null) {
- throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
- }
- if (!exist.getVersion().equals(request.getVersion())) {
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED,
- String.format("transform has already updated with groupId=%s, streamId=%s, name=%s, curVersion=%s",
- request.getInlongGroupId(), request.getInlongStreamId(),
- request.getTransformName(), request.getVersion()));
+ throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+ String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}
// update record
StreamTransformEntity transformEntity =
@@ -352,22 +315,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Boolean delete(DeleteTransformRequest request, UserInfo opInfo) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
- }
- // check group id
- if (StringUtils.isBlank(request.getInlongGroupId())) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
- }
- // check stream id
- if (StringUtils.isBlank(request.getInlongStreamId())) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
- }
- // check operator info
- if (opInfo == null) {
- throw new BusinessException(ErrorCodeEnum.LOGIN_USER_EMPTY);
- }
// Check if it can be added
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getInlongGroupId());
if (groupEntity == null) {
@@ -384,7 +331,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
// check inlong group status
GroupStatus status = GroupStatus.forCode(groupEntity.getStatus());
if (GroupStatus.notAllowedUpdate(status)) {
- throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
+ throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS,
+ String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}
// query records
List<StreamTransformEntity> entityList =
@@ -421,27 +369,44 @@ public class StreamTransformServiceImpl implements StreamTransformService {
Preconditions.checkNotNull(transformName, ErrorCodeEnum.TRANSFORM_NAME_IS_NULL.getMessage());
}
- private void checkRequestParams(TransformRequest request) {
- // check request parameter
- if (request == null) {
- throw new BusinessException(ErrorCodeEnum.REQUEST_IS_EMPTY);
+ private void chkUnmodifiableParams(TransformRequest request) {
+ StreamTransformEntity entity = transformMapper.selectById(request.getId());
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.TRANSFORM_NOT_FOUND);
}
+ // check record version
+ Preconditions.chkNotEquals(entity.getVersion(), request.getVersion(),
+ ErrorCodeEnum.CONFIG_EXPIRED,
+ String.format("record has expired with record version=%d, request version=%d",
+ entity.getVersion(), request.getVersion()));
// check group id
- if (StringUtils.isBlank(request.getInlongGroupId())) {
- throw new BusinessException(ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ if (StringUtils.isNotBlank(request.getInlongGroupId())
+ && !entity.getInlongGroupId().equals(request.getInlongGroupId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "inlongGroupId not allowed modify");
}
// check stream id
- if (StringUtils.isBlank(request.getInlongStreamId())) {
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+ if (StringUtils.isNotBlank(request.getInlongStreamId())
+ && !entity.getInlongStreamId().equals(request.getInlongStreamId())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "inlongStreamId not allowed modify");
}
// check transform type
- if (StringUtils.isBlank(request.getTransformType())) {
- throw new BusinessException(ErrorCodeEnum.TRANSFORM_TYPE_IS_NULL);
+ if (StringUtils.isNotBlank(request.getTransformType())
+ && !entity.getTransformType().equals(request.getTransformType())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "transformType not allowed modify");
}
// check transform name
- if (StringUtils.isBlank(request.getTransformName())) {
- throw new BusinessException(ErrorCodeEnum.TRANSFORM_NAME_IS_NULL);
- }
+ if (StringUtils.isNotBlank(request.getTransformName())
+ && !entity.getTransformName().equals(request.getTransformName())) {
+ throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER,
+ "transformName not allowed modify");
+ }
+ request.setInlongGroupId(entity.getInlongGroupId());
+ request.setInlongStreamId(entity.getInlongStreamId());
+ request.setTransformType(entity.getTransformType());
+ request.setTransformName(entity.getTransformName());
}
private void updateFieldOpt(StreamTransformEntity entity, List<StreamField> fieldList) {
@@ -455,11 +420,11 @@ public class StreamTransformServiceImpl implements StreamTransformService {
// Then batch save the source fields
this.saveFieldOpt(entity, fieldList);
- LOGGER.info("success to update transform field");
+ LOGGER.debug("success to update transform field");
}
private void saveFieldOpt(StreamTransformEntity entity, List<StreamField> fieldList) {
- LOGGER.info("begin to save transform field={}", fieldList);
+ LOGGER.debug("begin to save transform field={}", fieldList);
if (CollectionUtils.isEmpty(fieldList)) {
return;
}
@@ -488,6 +453,6 @@ public class StreamTransformServiceImpl implements StreamTransformService {
}
transformFieldMapper.insertAll(entityList);
- LOGGER.info("success to save transform fields");
+ LOGGER.debug("success to save transform fields");
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 292a543d7..a53d9aff6 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -55,12 +57,16 @@ public class OpenStreamSourceController {
@ApiOperation(value = "Get stream source")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
public Response<StreamSource> get(@PathVariable Integer id) {
+ Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sourceService.get(id, LoginUserUtils.getLoginUser()));
}
@RequestMapping(value = "/source/list", method = RequestMethod.GET)
@ApiOperation(value = "List stream sources by paginating")
public Response<PageResult<? extends StreamSource>> listByCondition(SourcePageRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sourceService.listByCondition(request, LoginUserUtils.getLoginUser()));
}
@@ -68,6 +74,8 @@ public class OpenStreamSourceController {
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save stream source")
public Response<Integer> save(@Validated(SaveValidation.class) @RequestBody SourceRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sourceService.save(request, LoginUserUtils.getLoginUser()));
}
@@ -75,6 +83,8 @@ public class OpenStreamSourceController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream source")
public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody SourceRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sourceService.update(request, LoginUserUtils.getLoginUser()));
}
@@ -83,6 +93,8 @@ public class OpenStreamSourceController {
@ApiOperation(value = "Delete stream source")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
public Response<Boolean> delete(@PathVariable Integer id) {
+ Preconditions.checkNull(id, ErrorCodeEnum.INVALID_PARAMETER, "source id is empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(sourceService.delete(id, LoginUserUtils.getLoginUser()));
}
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
index 205c575dc..b9efd2a07 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamTransformController.java
@@ -17,7 +17,9 @@
package org.apache.inlong.manager.web.controller.openapi;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateValidation;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.transform.DeleteTransformRequest;
@@ -54,6 +56,8 @@ public class OpenStreamTransformController {
@ApiOperation(value = "Get stream transform list")
public Response<List<TransformResponse>> list(@RequestParam("inlongGroupId") String groupId,
@RequestParam("inlongStreamId") String streamId) {
+ Preconditions.checkBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamTransformService.listTransform(
groupId, streamId, LoginUserUtils.getLoginUser()));
}
@@ -62,6 +66,8 @@ public class OpenStreamTransformController {
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save stream transform")
public Response<Integer> save(@Validated @RequestBody TransformRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(
streamTransformService.save(request, LoginUserUtils.getLoginUser()));
}
@@ -70,6 +76,8 @@ public class OpenStreamTransformController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update stream transform")
public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody TransformRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamTransformService.update(request, LoginUserUtils.getLoginUser()));
}
@@ -77,6 +85,8 @@ public class OpenStreamTransformController {
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Delete stream transform")
public Response<Boolean> delete(@Validated DeleteTransformRequest request) {
+ Preconditions.checkNull(request, ErrorCodeEnum.INVALID_PARAMETER, "request cannot be empty");
+ Preconditions.checkNull(LoginUserUtils.getLoginUser(), ErrorCodeEnum.LOGIN_USER_EMPTY);
return Response.success(streamTransformService.delete(request, LoginUserUtils.getLoginUser()));
}
}