You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/21 09:45:25 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new a8ac85cf2 [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
a8ac85cf2 is described below

commit a8ac85cf2b2cb27b85f17b7004489a164540d6ea
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 21 15:06:53 2022 +0800

    [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
---
 .../resource/sort/DefaultSortConfigOperator.java   | 67 +++++-----------------
 1 file changed, 14 insertions(+), 53 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d9f8d37a7..53ece64dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             return;
         }
 
-        GroupInfo configInfo;
-        // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
-        } else {
-            configInfo = this.getStandardGroupInfo(groupInfo, streamInfos);
-        }
-
+        GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
         String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
         if (isStream) {
             this.addToStreamExt(streamInfos, dataflow);
@@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         }
     }
 
-    private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
         // get source info
         Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
         // get sink info
@@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             Map<String, StreamField> fieldMap = new HashMap<>();
             inlongStream.getSourceList().forEach(
                     source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
+
             List<TransformResponse> transformResponseList = transformMap.get(streamId);
             if (CollectionUtils.isNotEmpty(transformResponseList)) {
                 transformResponseList.forEach(
@@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
             }
 
             // build a stream info from the nodes and relations
-            List<Node> nodes = this.createNodesWithTransform(sourceMap.get(streamId),
-                    transformResponseList, sinkMap.get(streamId), fieldMap);
-            List<NodeRelation> relations = NodeRelationUtils.createNodeRelations(inlongStream);
+            List<StreamSource> sources = sourceMap.get(streamId);
+            List<StreamSink> sinks = sinkMap.get(streamId);
+            List<Node> nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap);
+            List<NodeRelation> relations;
+            if (CollectionUtils.isEmpty(transformResponseList)) {
+                relations = NodeRelationUtils.createNodeRelations(sources, sinks);
+            } else {
+                relations = NodeRelationUtils.createNodeRelations(inlongStream);
+            }
             StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
             sortStreamInfos.add(streamInfo);
 
-            // rebuild joinerNode relation
+            // rebuild joinerNode relation if transformResponseList is not empty
             NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
         }
 
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
-    /**
-     * Get Sort GroupInfo of STANDARD inlong group.
-     *
-     * @see org.apache.inlong.sort.protocol.GroupInfo
-     */
-    private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        // get source info
-        Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
-        // get sink info
-        Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
-
-        // create StreamInfo for Sort protocol
-        List<StreamInfo> sortStreamInfos = new ArrayList<>();
-        for (InlongStreamInfo inlongStream : streamInfoList) {
-            String streamId = inlongStream.getInlongStreamId();
-
-            Map<String, StreamField> fieldMap = new HashMap<>();
-            inlongStream.getSourceList().forEach(
-                    source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
-
-            List<StreamSource> sources = sourceMap.get(streamId);
-            List<StreamSink> sinks = sinkMap.get(streamId);
-            StreamInfo sortStream = new StreamInfo(streamId,
-                    this.createNodesWithoutTransform(sources, sinks, fieldMap),
-                    NodeRelationUtils.createNodeRelations(sources, sinks));
-            sortStreamInfos.add(sortStream);
-        }
-
-        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
-    }
-
-    private List<Node> createNodesWithoutTransform(List<StreamSource> sources, List<StreamSink> sinks,
-            Map<String, StreamField> constantFieldMap) {
-        List<Node> nodes = Lists.newArrayList();
-        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
-        return nodes;
-    }
-
-    private List<Node> createNodesWithTransform(List<StreamSource> sources, List<TransformResponse> transformResponses,
+    private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
             List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
         List<Node> nodes = Lists.newArrayList();
         nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));