You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:23:34 UTC
[inlong] branch master updated: [INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2a5d88aca [INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)
2a5d88aca is described below
commit 2a5d88acab2a386843ba79658e757e31d6a6e9f7
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sun Sep 25 11:23:29 2022 +0800
[INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)
---
.../listener/sort/StreamSortConfigListener.java | 18 ++++++------------
.../manager/service/sink/StreamSinkServiceImpl.java | 20 +++++++++++++++++++-
2 files changed, 25 insertions(+), 13 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 72080935f..299d65eef 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -18,15 +18,12 @@
package org.apache.inlong.manager.service.listener.sort;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
@@ -62,18 +59,15 @@ public class StreamSortConfigListener implements SortOperateListener {
@Override
public boolean accept(WorkflowContext context) {
ProcessForm processForm = context.getProcessForm();
+ String className = processForm.getClass().getSimpleName();
String groupId = processForm.getInlongGroupId();
- if (!(processForm instanceof GroupResourceProcessForm)) {
- LOGGER.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
+ if (processForm instanceof StreamResourceProcessForm) {
+ LOGGER.info("accept sort config listener as the process is {} for groupId [{}]", className, groupId);
+ return true;
+ } else {
+ LOGGER.info("not accept sort config listener as the process is {} for groupId [{}]", className, groupId);
return false;
}
-
- GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
- && !MQType.NONE.equals(groupInfo.getMqType());
- LOGGER.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
- return enable;
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 8360dc4df..e2572651d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -99,6 +99,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// Make sure that there is no sink info with the current groupId and streamId
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
+ // Check whether the stream exist or not
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
for (StreamSinkEntity sinkEntity : sinkList) {
if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(), sinkName)) {
@@ -115,7 +118,22 @@ public class StreamSinkServiceImpl implements StreamSinkService {
fields.forEach(sinkField -> sinkField.setId(null));
}
int id = sinkOperator.saveOpt(request, operator);
-
+ boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+ if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+ SinkStatus nextStatus = SinkStatus.CONFIG_ING;
+ StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
+ sinkEntity.setStatus(nextStatus.getCode());
+ sinkMapper.updateStatus(sinkEntity);
+ }
+ // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+ if (streamSuccess) {
+ // To work around the circular reference check we manually instantiate and wire
+ if (streamProcessOperation == null) {
+ streamProcessOperation = new InlongStreamProcessService();
+ autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+ }
+ streamProcessOperation.startProcess(groupId, streamId, operator, false);
+ }
LOGGER.info("success to save sink info: {}", request);
return id;
}