You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:59:29 UTC

[inlong] 07/09: [INLONG-5680][Manager][Sort] Fix field relation object generate error (#5693)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 34cbdfab0db0894a0b53ebed0183ef4f3c526bfc
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Aug 25 15:06:46 2022 +0800

    [INLONG-5680][Manager][Sort] Fix field relation object generate error (#5693)
---
 .../manager/pojo/sort/util/FieldRelationUtils.java | 24 ++++++++++------------
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 11 +++++-----
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |  8 +++-----
 3 files changed, 19 insertions(+), 24 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 870ad42ee..4f8df5f14 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -20,9 +20,9 @@ package org.apache.inlong.manager.pojo.sort.util;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.Objects;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformDefinition;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -33,7 +33,6 @@ import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinitio
 import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinition.ReplaceRule;
 import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
 import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition.SplitRule;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.StringTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -50,6 +49,7 @@ import org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunctio
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -84,9 +84,9 @@ public class FieldRelationUtils {
                         preNodes, constantFieldMap);
             case DE_DUPLICATION:
             case FILTER:
-                return createFieldRelations(fieldList, transformName, constantFieldMap);
+                return createFieldRelations(fieldList, constantFieldMap);
             case JOINER:
-                return createJoinerFieldRelations(fieldList, transformName, constantFieldMap);
+                return createJoinerFieldRelations(fieldList, constantFieldMap);
             default:
                 throw new UnsupportedOperationException(
                         String.format("Unsupported transformType=%s", transformType));
@@ -96,7 +96,7 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields.
      */
-    private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName,
+    private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList,
             Map<String, StreamField> constantFieldMap) {
         return fieldList.stream()
                 .map(FieldInfoUtils::parseStreamField)
@@ -115,8 +115,7 @@ public class FieldRelationUtils {
                         inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
                                 fieldInfo.getFormatInfo());
                     }
-                    FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
-                            fieldInfo.getFormatInfo());
+                    FieldInfo outputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getFormatInfo());
                     return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
@@ -124,7 +123,7 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields in join function.
      */
-    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName,
+    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
             Map<String, StreamField> constantFieldMap) {
         return fieldList.stream()
                 .map(streamField -> {
@@ -144,8 +143,7 @@ public class FieldRelationUtils {
                         inputField = new FieldInfo(streamField.getOriginFieldName(),
                                 streamField.getOriginNodeName(), formatInfo);
                     }
-                    FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
-                            transformName, formatInfo);
+                    FieldInfo outputField = new FieldInfo(streamField.getFieldName(), formatInfo);
                     return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
@@ -170,7 +168,7 @@ public class FieldRelationUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
         return fieldRelations;
     }
 
@@ -190,7 +188,7 @@ public class FieldRelationUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
         return fieldRelations;
     }
 
@@ -209,7 +207,7 @@ public class FieldRelationUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !encryptFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
         return fieldRelations;
     }
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 6e96d7bfd..bd1b83bfb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -21,8 +21,8 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -71,12 +71,12 @@ import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
 
 /**
  * Util for load node info.
@@ -102,8 +102,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
                 .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(),
-                streamSink.getSinkName(), constantFieldMap);
+        List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
         Map<String, String> properties = streamSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         String sinkType = streamSink.getSinkType();
@@ -497,7 +496,7 @@ public class LoadNodeUtils {
     /**
      * Parse information field of data sink.
      */
-    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName,
+    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
             Map<String, StreamField> constantFieldMap) {
         if (CollectionUtils.isEmpty(fieldList)) {
             return Lists.newArrayList();
@@ -505,7 +504,7 @@ public class LoadNodeUtils {
         return fieldList.stream()
                 .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
                 .map(field -> {
-                    FieldInfo outputField = new FieldInfo(field.getFieldName(), sinkName,
+                    FieldInfo outputField = new FieldInfo(field.getFieldName(),
                             FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
                     FunctionParam inputField;
                     String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 02722a9c5..3cc166601 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -338,17 +338,15 @@ public class FlinkSqlParser implements Parser {
         // Generate mapping for output field to FieldRelation
         fieldRelations.forEach(s -> {
             // All field relations of input nodes will be the same if the node id of output field is blank.
-            // Currently, the node id in the output file is used to distinguish which field of the node in the upstream
-            // of the union the field comes from. A better way is through the upstream input field,
+            // Currently, the node id in the output field is used to distinguish which field of the node in the
+            // upstream of the union the field comes from. A better way is through the upstream input field,
             // but this abstraction does not yet have the ability to set node ids for all upstream input fields.
             // todo optimize the implementation of this block in the future
             String nodeId = s.getOutputField().getNodeId();
             if (StringUtils.isBlank(nodeId)) {
                 nodeId = unionRelation.getInputs().get(0);
             }
-            Map<String, FieldRelation> subRelationMap = fieldRelationMap
-                    .computeIfAbsent(nodeId, k -> new HashMap<>());
-            subRelationMap.put(s.getOutputField().getName(), s);
+            fieldRelationMap.computeIfAbsent(nodeId, k -> new HashMap<>()).put(s.getOutputField().getName(), s);
         });
         StringBuilder sb = new StringBuilder();
         sb.append(genUnionSingleSelectSql(unionRelation.getInputs().get(0),