You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/23 09:44:33 UTC

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6010: [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode

yunqingmoswu commented on code in PR #6010:
URL: https://github.com/apache/inlong/pull/6010#discussion_r978450125


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq source node or transform node,
+                    // otherwise replace it with mq node name, which should be stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (NodeRelation relation : relations) {
+            List<String> inputs = relation.getInputs();
+            for (int index = 0; index < inputs.size(); ++index) {
+                String inputName = inputs.get(index);
+                if (!(validNameSet.contains(inputName))) {

Review Comment:
   !(validNameSet.contains(inputName)) -> !validNameSet.contains(inputName)



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   Why is it get(0)?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {
+                    // in standard mode transform input node must either be mq source node or transform node,
+                    // otherwise replace it with mq node name, which should be stream id
+                    field.setOriginNodeName(mqNodeName);
+                }
+            }
+        }
+    }
+
+    private void adjustNodeRelations(List<NodeRelation> relations, InlongGroupInfo groupInfo,
+            List<StreamSource> sources, List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set relations' input node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();

Review Comment:
   Why is it get(0)?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##########
@@ -137,6 +148,58 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo>
         return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
+    private Set<String> getValidInputNodeNamesInStandardMode(List<StreamSource> mqSources,
+            List<TransformResponse> transforms) {
+        Set<String> mqSourceNameSet = mqSources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet());
+        Set<String> transformNameSet = transforms.stream().map(TransformResponse::getTransformName)
+                .collect(Collectors.toSet());
+        Set<String> result = Sets.newHashSet();
+        result.addAll(mqSourceNameSet);
+        result.addAll(transformNameSet);
+        return result;
+    }
+
+    private void preprocessTransformList(InlongGroupInfo groupInfo, List<StreamSource> sources,
+            List<TransformResponse> transforms) {
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            return;
+        }
+
+        // set transform fields' origin node to mq node when necessary
+        String mqNodeName = sources.get(0).getSourceName();
+        Set<String> validNameSet = getValidInputNodeNamesInStandardMode(sources, transforms);
+        for (TransformResponse transform : transforms) {
+            for (StreamField field : transform.getFieldList()) {
+                String originNodeName = field.getOriginNodeName();
+                if (!(validNameSet.contains(originNodeName))) {

Review Comment:
   !(validNameSet.contains(originNodeName)) -> !validNameSet.contains(originNodeName)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org