You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 12:20:55 UTC
[inlong] branch master updated: [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e64cbd96c [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)
e64cbd96c is described below
commit e64cbd96cf5a8771c3f617531230e26f33508e10
Author: healchow <he...@gmail.com>
AuthorDate: Tue Nov 15 20:20:48 2022 +0800
[INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)
---
.../listener/group/InitGroupCompleteListener.java | 46 ++++++------
.../stream/InitStreamCompleteListener.java | 21 +++---
.../resource/sort/DefaultSortConfigOperator.java | 21 ------
.../service/stream/InlongStreamService.java | 11 ++-
.../service/stream/InlongStreamServiceImpl.java | 87 +++++-----------------
5 files changed, 67 insertions(+), 119 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index a16562fd8..bd848f996 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -72,30 +72,34 @@ public class InitGroupCompleteListener implements ProcessEventListener {
String groupId = form.getInlongGroupId();
log.info("begin to execute InitGroupCompleteListener for groupId={}", groupId);
- // update inlong group status and other info
- InlongGroupInfo groupInfo = form.getGroupInfo();
- String operator = context.getOperator();
- groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
- if (InlongGroupUtils.isBatchTask(form.getGroupInfo())) {
- groupService.updateStatus(groupId, GroupStatus.FINISH.getCode(), operator);
- }
- InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
- InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
- updateGroupRequest.setVersion(existGroup.getVersion());
- groupService.update(updateGroupRequest, operator);
+ try {
+ // update inlong group status and other info
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ String operator = context.getOperator();
+ Integer nextStatus = InlongGroupUtils.isBatchTask(form.getGroupInfo())
+ ? GroupStatus.FINISH.getCode() : GroupStatus.CONFIG_SUCCESSFUL.getCode();
+ groupService.updateStatus(groupId, nextStatus, operator);
+
+ InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
+ InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
+ updateGroupRequest.setVersion(existGroup.getVersion());
+ groupService.update(updateGroupRequest, operator);
- // update status of other related configs
- if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
- if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
- sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
- } else {
- sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ // update status of other related configs
+ if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ } else {
+ sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ }
}
- }
- log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
- return ListenerResult.success();
+ log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
+ return ListenerResult.success();
+ } catch (Exception e) {
+ throw new WorkflowListenerException(e);
+ }
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index 141635c46..2796bde0b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -61,16 +61,19 @@ public class InitStreamCompleteListener implements ProcessEventListener {
final String streamId = streamInfo.getInlongStreamId();
final String operator = context.getOperator();
- // Update status of other related configs
- streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
- streamService.update(streamInfo.genRequest(), operator);
- if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
- sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
- } else {
- sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ try {
+ // Update status of other related configs
+ streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+ streamService.updateWithoutCheck(streamInfo.genRequest(), operator);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
+ sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ } else {
+ sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ }
+ return ListenerResult.success();
+ } catch (Exception e) {
+ throw new WorkflowListenerException(e);
}
-
- return ListenerResult.success();
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 08ec23226..2664cee1e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -242,24 +241,4 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
groupInfo.getExtList().add(extInfo);
}
- /**
- * Add config into inlong stream ext info
- */
- private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) {
- streamInfos.forEach(streamInfo -> {
- if (streamInfo.getExtList() == null) {
- streamInfo.setExtList(new ArrayList<>());
- }
-
- InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
- extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
- extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
- extInfo.setKeyName(InlongConstants.DATAFLOW);
- extInfo.setKeyValue(value);
-
- streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
- streamInfo.getExtList().add(extInfo);
- });
- }
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 3d17b3972..5a9b5b144 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -94,7 +94,7 @@ public interface InlongStreamService {
List<InlongStreamBriefInfo> listBriefWithSink(String groupId);
/**
- * InlongStream info that needs to be modified
+ * Update the InlongStream info
*
* @param request inlong stream info that needs to be modified
* @param operator Edit person's name
@@ -102,6 +102,15 @@ public interface InlongStreamService {
*/
Boolean update(InlongStreamRequest request, String operator);
+ /**
+ * Update the InlongStream - not check the InlongGroup status to which the stream belongs.
+ *
+ * @param request inlong stream info that needs to be modified
+ * @param operator Edit person's name
+ * @return whether succeed
+ */
+ Boolean updateWithoutCheck(InlongStreamRequest request, String operator);
+
/**
* Delete the specified inlong stream.
* <p/>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index d8a49c30b..93146e278 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -59,7 +59,6 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -281,8 +280,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
return briefInfoList;
}
- @Transactional(rollbackFor = Throwable.class)
@Override
+ @Transactional(rollbackFor = Throwable.class)
public Boolean update(InlongStreamRequest request, String operator) {
LOGGER.debug("begin to update inlong stream info={}", request);
Preconditions.checkNotNull(request, "inlong stream request is empty");
@@ -292,22 +291,30 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be modified
- InlongGroupEntity inlongGroupEntity = this.checkGroupStatusIsTemp(groupId);
+ this.checkGroupStatusIsTemp(groupId);
+
+ return this.updateWithoutCheck(request, operator);
+ }
- // Make sure the stream was exists
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Boolean updateWithoutCheck(InlongStreamRequest request, String operator) {
+ LOGGER.debug("begin to update inlong stream without check, request={}", request);
+ // make sure the stream was exists
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (streamEntity == null) {
LOGGER.error("inlong stream not found by groupId={}, streamId={}", groupId, streamId);
throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
}
- String errMsg = String.format("stream has already updated with group id=%s, stream id=%s, curVersion=%s",
+
+ String errMsg = String.format("stream has already updated with groupId=%s, streamId=%s, curVersion=%s",
streamEntity.getInlongGroupId(), streamEntity.getInlongStreamId(), request.getVersion());
if (!Objects.equals(streamEntity.getVersion(), request.getVersion())) {
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- // Check whether the current inlong group status supports modification
- this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request);
CommonBeanUtils.copyProperties(request, streamEntity, true);
streamEntity.setModifier(operator);
@@ -316,13 +323,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- // Update field information
+ // update stream fields
updateField(groupId, streamId, request.getFieldList());
- // Update extension info
- List<InlongStreamExtInfo> extInfos = request.getExtList();
- saveOrUpdateExt(groupId, streamId, extInfos);
+ // update stream extension infos
+ saveOrUpdateExt(groupId, streamId, request.getExtList());
- LOGGER.info("success to update inlong stream for groupId={}", groupId);
+ LOGGER.info("success to update inlong stream without check for groupId={} streamId={}", groupId, streamId);
return true;
}
@@ -465,7 +471,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public boolean updateStatus(String groupId, String streamId, Integer status, String operator) {
- LOGGER.debug("begin to update status by groupId={}, streamId={}", groupId, streamId);
streamMapper.updateStatusByIdentifier(groupId, streamId, status, operator);
LOGGER.info("success to update stream after approve for groupId=" + groupId + ", streamId=" + streamId);
return true;
@@ -563,66 +568,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
private InlongGroupEntity checkGroupStatusIsTemp(String groupId) {
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
Preconditions.checkNotNull(entity, "groupId is invalid");
- // Add/modify/delete is not allowed under certain inlong group status
+ // Add/modify/delete is not allowed under temporary inlong group status
GroupStatus curState = GroupStatus.forCode(entity.getStatus());
if (GroupStatus.isTempStatus(curState)) {
- LOGGER.error("inlong group status was not allowed to add/update/delete inlong stream");
+ LOGGER.error("inlong groupId={} status={} was not allowed to add/update/delete stream", groupId, curState);
throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED);
}
return entity;
}
- /**
- * Verify the fields that cannot be modified in the current inlong group status
- *
- * @param groupStatus Inlong group status
- * @param streamEntity Original inlong stream entity
- * @param request New inlong stream information
- */
- private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
- if (streamEntity == null || request == null) {
- return;
- }
-
- // Fields that are not allowed to be modified when the inlong group [configuration is successful]
- if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupStatus)) {
- checkUpdatedFields(streamEntity, request);
- }
-
- // Inlong group [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
- // stream source/stream sink, the fields that are not allowed to be modified
- List<Integer> statusList = Arrays.asList(
- GroupStatus.TO_BE_SUBMIT.getCode(),
- GroupStatus.APPROVE_REJECTED.getCode(),
- GroupStatus.CONFIG_FAILED.getCode());
- if (statusList.contains(groupStatus)) {
- String groupId = request.getInlongGroupId();
- String streamId = request.getInlongStreamId();
- // Whether there is undeleted stream source and sink
- int sourceCount = sourceService.getCount(groupId, streamId);
- int sinkCount = sinkService.getCount(groupId, streamId);
- if (sourceCount > 0 || sinkCount > 0) {
- checkUpdatedFields(streamEntity, request);
- }
- }
- }
-
- /**
- * Check that groupId, streamId are not allowed to be modified
- */
- private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamRequest request) {
- String newGroupId = request.getInlongGroupId();
- if (newGroupId != null && !newGroupId.equals(streamEntity.getInlongGroupId())) {
- LOGGER.error("current status was not allowed to update inlong group id");
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
- }
-
- String newStreamId = request.getInlongStreamId();
- if (newStreamId != null && !newStreamId.equals(streamEntity.getInlongStreamId())) {
- LOGGER.error("current status was not allowed to update inlong stream id");
- throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
- }
- }
-
}