You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 11:54:21 UTC

[inlong] branch master updated: [INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 02732c069 [INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)
02732c069 is described below

commit 02732c0698b25715bf2d68bbfe11a344f98e8100
Author: healchow <he...@gmail.com>
AuthorDate: Fri Sep 2 19:54:15 2022 +0800

    [INLONG-5769][Manager] Fix the FILE type unsupported for SourceOperateListener (#5771)
---
 .../source/AbstractSourceOperateListener.java      | 44 +++++-----------------
 1 file changed, 9 insertions(+), 35 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index fdcb430bb..7f1097e18 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -21,18 +21,12 @@ import com.google.common.collect.Lists;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
-import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -75,15 +69,16 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
         List<StreamSource> unOperatedSources = Lists.newArrayList();
         streamResponses.forEach(stream ->
                 operateStreamSources(groupId, stream.getInlongStreamId(), context.getOperator(), unOperatedSources));
+
         if (CollectionUtils.isNotEmpty(unOperatedSources)) {
-            GroupOperateType groupOperateType = getOperateType(context.getProcessForm());
-            StringBuilder builder = new StringBuilder("Unsupported operate ").append(groupOperateType).append(" for (");
+            GroupOperateType operateType = getOperateType(context.getProcessForm());
+            StringBuilder builder = new StringBuilder("Unsupported operate ").append(operateType).append(" for (");
             unOperatedSources.forEach(source -> builder.append(" ").append(source.getSourceName()).append(" "));
             String errMsg = builder.append(")").toString();
             throw new WorkflowListenerException(errMsg);
-        } else {
-            return ListenerResult.success();
         }
+
+        return ListenerResult.success();
     }
 
     /**
@@ -93,10 +88,8 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
             List<StreamSource> unOperatedSources) {
         List<StreamSource> sources = streamSourceService.listSource(groupId, streamId);
         sources.forEach(source -> {
-            boolean checkIfOp = checkIfOp(source, unOperatedSources);
-            if (checkIfOp) {
-                SourceRequest sourceRequest = createSourceRequest(source);
-                operateStreamSource(sourceRequest, operator);
+            if (checkIfOp(source, unOperatedSources)) {
+                operateStreamSource(source.genSourceRequest(), operator);
             }
         });
     }
@@ -130,25 +123,6 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
         return false;
     }
 
-    /**
-     * Creat source request by source type.
-     *
-     * @param streamSource source information
-     * @return source request
-     */
-    public SourceRequest createSourceRequest(StreamSource streamSource) {
-        String sourceType = streamSource.getSourceType();
-        switch (sourceType) {
-            case SourceType.MYSQL_BINLOG:
-                return CommonBeanUtils.copyProperties((MySQLBinlogSource) streamSource, MySQLBinlogSourceRequest::new);
-            case SourceType.KAFKA:
-                return CommonBeanUtils.copyProperties((KafkaSource) streamSource, KafkaSourceRequest::new);
-            default:
-                throw new IllegalArgumentException(
-                        String.format("Unsupported type=%s for SourceOperateListener", sourceType));
-        }
-    }
-
     /**
      * Operate stream sources ,such as delete, stop, restart.
      */
@@ -159,7 +133,7 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
             return ((GroupResourceProcessForm) processForm).getGroupOperateType();
         } else {
             log.error("illegal process form {} to get inlong group info", processForm.getFormName());
-            throw new RuntimeException("Unsupported ProcessForm " + processForm.getFormName());
+            throw new RuntimeException("Unsupported process form " + processForm.getFormName());
         }
     }
 
@@ -169,7 +143,7 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
             return groupResourceProcessForm.getGroupInfo();
         } else {
             log.error("illegal process form {} to get inlong group info", processForm.getFormName());
-            throw new RuntimeException("Unsupported ProcessForm " + processForm.getFormName());
+            throw new RuntimeException("Unsupported process form " + processForm.getFormName());
         }
     }