You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/23 09:48:53 UTC

[GitHub] [inlong] woofyzhao commented on a diff in pull request #6010: [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode

woofyzhao commented on code in PR #6010:
URL: https://github.com/apache/inlong/pull/6010#discussion_r978460623


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq source node or transform node,
+                    // otherwise replace it with mq node name, which should be stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   In standard mode there is exactly one mq source for each stream at the moment. 
   Its source name is the corresponding stream id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org