You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:23:34 UTC

[inlong] branch master updated: [INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a5d88aca [INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)
2a5d88aca is described below

commit 2a5d88acab2a386843ba79658e757e31d6a6e9f7
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sun Sep 25 11:23:29 2022 +0800

    [INLONG-6001][Manager] Ensure that the newly created sink under the successful stream can be configured normally (#6002)
---
 .../listener/sort/StreamSortConfigListener.java      | 18 ++++++------------
 .../manager/service/sink/StreamSinkServiceImpl.java  | 20 +++++++++++++++++++-
 2 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
index 72080935f..299d65eef 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/StreamSortConfigListener.java
@@ -18,15 +18,12 @@
 package org.apache.inlong.manager.service.listener.sort;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.resource.sort.SortConfigOperator;
@@ -62,18 +59,15 @@ public class StreamSortConfigListener implements SortOperateListener {
     @Override
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
+        String className = processForm.getClass().getSimpleName();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof GroupResourceProcessForm)) {
-            LOGGER.warn("zookeeper enabled was [false] for groupId [{}]", groupId);
+        if (processForm instanceof StreamResourceProcessForm) {
+            LOGGER.info("accept sort config listener as the process is {} for groupId [{}]", className, groupId);
+            return true;
+        } else {
+            LOGGER.info("not accept sort config listener as the process is {} for groupId [{}]", className, groupId);
             return false;
         }
-
-        GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-        boolean enable = InlongConstants.ENABLE_ZK.equals(groupInfo.getEnableZookeeper())
-                && !MQType.NONE.equals(groupInfo.getMqType());
-        LOGGER.info("zookeeper enabled was [{}] for groupId [{}]", enable, groupId);
-        return enable;
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 8360dc4df..e2572651d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -99,6 +99,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         // Make sure that there is no sink info with the current groupId and streamId
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
+        // Check whether the stream exist or not
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
         for (StreamSinkEntity sinkEntity : sinkList) {
             if (sinkEntity != null && Objects.equals(sinkEntity.getSinkName(), sinkName)) {
@@ -115,7 +118,22 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             fields.forEach(sinkField -> sinkField.setId(null));
         }
         int id = sinkOperator.saveOpt(request, operator);
-
+        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+            SinkStatus nextStatus = SinkStatus.CONFIG_ING;
+            StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
+            sinkEntity.setStatus(nextStatus.getCode());
+            sinkMapper.updateStatus(sinkEntity);
+        }
+        // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+        if (streamSuccess) {
+            // To work around the circular reference check we manually instantiate and wire
+            if (streamProcessOperation == null) {
+                streamProcessOperation = new InlongStreamProcessService();
+                autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+            }
+            streamProcessOperation.startProcess(groupId, streamId, operator, false);
+        }
         LOGGER.info("success to save sink info: {}", request);
         return id;
     }