You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/26 08:12:43 UTC

[inlong] branch master updated: [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5407d7f06 [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)
5407d7f06 is described below

commit 5407d7f0686aa53fd9e5b9e56b41aa38ae1711f3
Author: woofyzhao <49...@qq.com>
AuthorDate: Mon Sep 26 16:12:36 2022 +0800

    [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)
    
    * Change the transform source node name to be MQ node in standard mode
    
    * Reuse some params, and use an iterator to loop and modify the List
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../resource/sort/DefaultSortConfigOperator.java   | 91 ++++++++++++++++++----
 1 file changed, 74 insertions(+), 17 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 53ece64dc..6096777ac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -17,23 +17,22 @@
 
 package org.apache.inlong.manager.service.resource.sort;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
+import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
-import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
-import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.transform.StreamTransformService;
 import org.apache.inlong.sort.protocol.GroupInfo;
@@ -47,8 +46,11 @@ import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -99,8 +101,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         // get sink info
         Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
 
-        List<TransformResponse> transformResponses = transformService.listTransform(groupInfo.getInlongGroupId(), null);
-        Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
+        List<TransformResponse> transformList = transformService.listTransform(groupInfo.getInlongGroupId(), null);
+        Map<String, List<TransformResponse>> transformMap = transformList.stream()
                 .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
 
@@ -111,35 +113,90 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             inlongStream.getSourceList().forEach(
                     source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
 
-            List<TransformResponse> transformResponseList = transformMap.get(streamId);
-            if (CollectionUtils.isNotEmpty(transformResponseList)) {
-                transformResponseList.forEach(
+            List<TransformResponse> transformResponses = transformMap.get(streamId);
+            if (CollectionUtils.isNotEmpty(transformResponses)) {
+                transformResponses.forEach(
                         trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
             }
 
             // build a stream info from the nodes and relations
             List<StreamSource> sources = sourceMap.get(streamId);
             List<StreamSink> sinks = sinkMap.get(streamId);
-            List<Node> nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap);
             List<NodeRelation> relations;
-            if (CollectionUtils.isEmpty(transformResponseList)) {
+            if (CollectionUtils.isEmpty(transformResponses)) {
                 relations = NodeRelationUtils.createNodeRelations(sources, sinks);
             } else {
                 relations = NodeRelationUtils.createNodeRelations(inlongStream);
+                // in standard mode, replace upstream source node and transform input fields node to mq node
+                if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
+                    // mq node name, which is inlong stream id
+                    String mqNodeName = sources.get(0).getSourceName();
+                    Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
+                    adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
+                    adjustNodeRelations(relations, nodeNameSet, mqNodeName);
+                }
             }
+
+            // create extract-transform-load nodes
+            List<Node> nodes = this.createNodes(sources, transformResponses, sinks, fieldMap);
+
             StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
             sortStreamInfos.add(streamInfo);
 
-            // rebuild joinerNode relation if transformResponseList is not empty
-            NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
+            // rebuild joinerNode relation
+            NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses);
         }
 
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    /**
+     * Deduplicate to get the node names of Source and Transform.
+     */
+    private Set<String> getInputNodeNames(List<StreamSource> sources, List<TransformResponse> transforms) {
+        Set<String> result = new HashSet<>();
+        if (CollectionUtils.isNotEmpty(sources)) {
+            result.addAll(sources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()));
+        }
+        if (CollectionUtils.isNotEmpty(transforms)) {
+            result.addAll(transforms.stream().map(TransformResponse::getTransformName).collect(Collectors.toSet()));
+        }
+        return result;
+    }
+
+    /**
+     * Set origin node to mq node for transform fields if necessary.
+     *
+     * In standard mode for InlongGroup, transform input node must either be
+     * mq source node or transform node, otherwise replace it with mq node name.
+     */
+    private void adjustTransformField(List<TransformResponse> transforms, Set<String> nodeNameSet, String mqNodeName) {
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                if (!nodeNameSet.contains(field.getOriginNodeName())) {
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    /**
+     * Set the input node to MQ node for NodeRelations
+     */
+    private void adjustNodeRelations(List<NodeRelation> relations, Set<String> nodeNameSet, String mqNodeName) {
+        for (NodeRelation relation : relations) {
+            ListIterator<String> iterator = relation.getInputs().listIterator();
+            while (iterator.hasNext()) {
+                if (!nodeNameSet.contains(iterator.next())) {
+                    iterator.set(mqNodeName);
+                }
+            }
+        }
+    }
+
     private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
             List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
-        List<Node> nodes = Lists.newArrayList();
+        List<Node> nodes = new ArrayList<>();
         nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
         nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
         nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
@@ -170,7 +227,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
      */
     private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
         if (groupInfo.getExtList() == null) {
-            groupInfo.setExtList(Lists.newArrayList());
+            groupInfo.setExtList(new ArrayList<>());
         }
 
         InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
@@ -188,7 +245,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) {
         streamInfos.forEach(streamInfo -> {
             if (streamInfo.getExtList() == null) {
-                streamInfo.setExtList(Lists.newArrayList());
+                streamInfo.setExtList(new ArrayList<>());
             }
 
             InlongStreamExtInfo extInfo = new InlongStreamExtInfo();