You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/22 02:14:41 UTC

[inlong] branch master updated: [INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 390b161d4 [INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)
390b161d4 is described below

commit 390b161d4812d68ccae17546e889692eae82daa5
Author: healchow <he...@gmail.com>
AuthorDate: Fri Jul 22 10:14:37 2022 +0800

    [INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)
---
 .../service/sort/DefaultSortConfigOperator.java    | 103 ++++++++++++++-------
 .../manager/service/sort/util/LoadNodeUtils.java   |   6 +-
 .../service/sort/util/NodeRelationUtils.java       |  18 +++-
 3 files changed, 86 insertions(+), 41 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index 38cdf5880..0045dc8e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -82,7 +82,12 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
 
         GroupInfo configInfo;
         // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
-        configInfo = getGroupInfo(groupInfo, streamInfos);
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
+        } else {
+            configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
+        }
+
         String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
         if (isStream) {
             this.addToStreamExt(streamInfos, dataflow);
@@ -95,14 +100,13 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         }
     }
 
-    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+    private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
         // get source info
         Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
         // get sink info
-        String groupId = groupInfo.getInlongGroupId();
         Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
 
-        List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
+        List<TransformResponse> transformResponses = transformService.listTransform(groupInfo.getInlongGroupId(), null);
         Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
                 .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
@@ -110,17 +114,19 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         List<StreamInfo> sortStreamInfos = new ArrayList<>();
         for (InlongStreamInfo inlongStream : streamInfoList) {
             String streamId = inlongStream.getInlongStreamId();
-            Map<String, StreamField> constantFieldMap = new HashMap<>();
-            inlongStream.getSourceList()
-                    .forEach(s -> parseConstantFieldMap(s.getSourceName(), s.getFieldList(), constantFieldMap));
+            Map<String, StreamField> fieldMap = new HashMap<>();
+            inlongStream.getSourceList().forEach(
+                    source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
             List<TransformResponse> transformResponseList = transformMap.get(streamId);
             if (CollectionUtils.isNotEmpty(transformResponseList)) {
-                transformResponseList
-                        .forEach(s -> parseConstantFieldMap(s.getTransformName(), s.getFieldList(), constantFieldMap));
+                transformResponseList.forEach(
+                        trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
             }
-            List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
-                    transformResponseList, sinkMap.get(streamId), constantFieldMap);
-            List<NodeRelation> relations = NodeRelationUtils.createNodeRelationsForStream(inlongStream);
+
+            // build a stream info from the nodes and relations
+            List<Node> nodes = this.createNodesWithTransform(sourceMap.get(streamId),
+                    transformResponseList, sinkMap.get(streamId), fieldMap);
+            List<NodeRelation> relations = NodeRelationUtils.createNodeRelations(inlongStream);
             StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
             sortStreamInfos.add(streamInfo);
 
@@ -131,12 +137,60 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    /**
+     * Get Sort GroupInfo of normal inlong group.
+     *
+     * @see org.apache.inlong.sort.protocol.GroupInfo
+     */
+    private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+        // get source info
+        Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+        // get sink info
+        Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
+        // create StreamInfo for Sort protocol
+        List<StreamInfo> sortStreamInfos = new ArrayList<>();
+        for (InlongStreamInfo inlongStream : streamInfoList) {
+            String streamId = inlongStream.getInlongStreamId();
+
+            Map<String, StreamField> fieldMap = new HashMap<>();
+            inlongStream.getSourceList().forEach(
+                    source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
+
+            List<StreamSource> sources = sourceMap.get(streamId);
+            List<StreamSink> sinks = sinkMap.get(streamId);
+            StreamInfo sortStream = new StreamInfo(streamId,
+                    this.createNodesWithoutTransform(sources, sinks, fieldMap),
+                    NodeRelationUtils.createNodeRelations(sources, sinks));
+            sortStreamInfos.add(sortStream);
+        }
+
+        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
+    }
+
+    private List<Node> createNodesWithoutTransform(List<StreamSource> sources, List<StreamSink> sinks,
+            Map<String, StreamField> constantFieldMap) {
+        List<Node> nodes = Lists.newArrayList();
+        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+        return nodes;
+    }
+
+    private List<Node> createNodesWithTransform(List<StreamSource> sources, List<TransformResponse> transformResponses,
+            List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
+        List<Node> nodes = Lists.newArrayList();
+        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+        return nodes;
+    }
+
     /**
      * Get constant field from stream fields
      *
-     * @param nodeId The node id
-     * @param fields The stream fields
-     * @param constantFieldMap The constant field map
+     * @param nodeId node id
+     * @param fields stream fields
+     * @param constantFieldMap constant field map
      */
     private void parseConstantFieldMap(String nodeId, List<StreamField> fields,
             Map<String, StreamField> constantFieldMap) {
@@ -150,25 +204,6 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         }
     }
 
-    private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
-            List<TransformResponse> transformResponses, List<StreamSink> streamSinks,
-            Map<String, StreamField> constantFieldMap) {
-        List<Node> nodes = Lists.newArrayList();
-        nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
-        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks, constantFieldMap));
-        return nodes;
-    }
-
-    private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
-        NodeRelation relation = new NodeRelation();
-        List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
-        List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
-        relation.setInputs(inputs);
-        relation.setOutputs(outputs);
-        return Lists.newArrayList(relation);
-    }
-
     /**
      * Add config into inlong group ext info
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 49ad68231..88c88e6f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -84,13 +84,13 @@ public class LoadNodeUtils {
     /**
      * Create nodes of data load.
      */
-    public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks,
-            Map<String, StreamField> constantFieldMap) {
+    public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks, Map<String, StreamField> fieldMap) {
         if (CollectionUtils.isEmpty(streamSinks)) {
             return Lists.newArrayList();
         }
         return streamSinks.stream()
-                .map(s -> LoadNodeUtils.createLoadNode(s, constantFieldMap)).collect(Collectors.toList());
+                .map(sink -> LoadNodeUtils.createLoadNode(sink, fieldMap))
+                .collect(Collectors.toList());
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
index d5bafbcd5..5beb8ce69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
@@ -64,9 +64,8 @@ public class NodeRelationUtils {
     /**
      * Create node relation for the given stream
      */
-    public static List<NodeRelation> createNodeRelationsForStream(InlongStreamInfo streamInfo) {
-        String tempView = streamInfo.getExtParams();
-        if (StringUtils.isEmpty(tempView)) {
+    public static List<NodeRelation> createNodeRelations(InlongStreamInfo streamInfo) {
+        if (StringUtils.isEmpty(streamInfo.getExtParams())) {
             log.warn("stream node relation is empty for {}", streamInfo);
             return Lists.newArrayList();
         }
@@ -79,6 +78,18 @@ public class NodeRelationUtils {
                 .collect(Collectors.toList());
     }
 
+    /**
+     * Create node relation from the given sources and sinks
+     */
+    public static List<NodeRelation> createNodeRelations(List<StreamSource> sources, List<StreamSink> sinks) {
+        NodeRelation relation = new NodeRelation();
+        List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
+        List<String> outputs = sinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
+        relation.setInputs(inputs);
+        relation.setOutputs(outputs);
+        return Lists.newArrayList(relation);
+    }
+
     /**
      * Optimize relation of node, JoinerRelation must be rebuilt.
      */
@@ -110,7 +121,6 @@ public class NodeRelationUtils {
                 String nodeName = outputs.get(0);
                 if (joinNodes.get(nodeName) != null) {
                     TransformDefinition transformDefinition = transformTypeMap.get(nodeName);
-                    TransformNode transformNode = joinNodes.get(nodeName);
                     joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition, relation));
                     shipIterator.remove();
                 }