You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/25 09:06:24 UTC
[inlong] 01/03: [INLONG-5680][Manager][Sort] Fix field relation object generate error (#5693)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 34cbdfab0db0894a0b53ebed0183ef4f3c526bfc
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Thu Aug 25 15:06:46 2022 +0800
[INLONG-5680][Manager][Sort] Fix field relation object generate error (#5693)
---
.../manager/pojo/sort/util/FieldRelationUtils.java | 24 ++++++++++------------
.../manager/pojo/sort/util/LoadNodeUtils.java | 11 +++++-----
.../inlong/sort/parser/impl/FlinkSqlParser.java | 8 +++-----
3 files changed, 19 insertions(+), 24 deletions(-)
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
index 870ad42ee..4f8df5f14 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java
@@ -20,9 +20,9 @@ package org.apache.inlong.manager.pojo.sort.util;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.Objects;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformDefinition;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -33,7 +33,6 @@ import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinitio
import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinition.ReplaceRule;
import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition.SplitRule;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.StringTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -50,6 +49,7 @@ import org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunctio
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -84,9 +84,9 @@ public class FieldRelationUtils {
preNodes, constantFieldMap);
case DE_DUPLICATION:
case FILTER:
- return createFieldRelations(fieldList, transformName, constantFieldMap);
+ return createFieldRelations(fieldList, constantFieldMap);
case JOINER:
- return createJoinerFieldRelations(fieldList, transformName, constantFieldMap);
+ return createJoinerFieldRelations(fieldList, constantFieldMap);
default:
throw new UnsupportedOperationException(
String.format("Unsupported transformType=%s", transformType));
@@ -96,7 +96,7 @@ public class FieldRelationUtils {
/**
* Create relation of fields.
*/
- private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName,
+ private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList,
Map<String, StreamField> constantFieldMap) {
return fieldList.stream()
.map(FieldInfoUtils::parseStreamField)
@@ -115,8 +115,7 @@ public class FieldRelationUtils {
inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
fieldInfo.getFormatInfo());
}
- FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
- fieldInfo.getFormatInfo());
+ FieldInfo outputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getFormatInfo());
return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
@@ -124,7 +123,7 @@ public class FieldRelationUtils {
/**
* Create relation of fields in join function.
*/
- private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName,
+ private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
Map<String, StreamField> constantFieldMap) {
return fieldList.stream()
.map(streamField -> {
@@ -144,8 +143,7 @@ public class FieldRelationUtils {
inputField = new FieldInfo(streamField.getOriginFieldName(),
streamField.getOriginNodeName(), formatInfo);
}
- FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
- transformName, formatInfo);
+ FieldInfo outputField = new FieldInfo(streamField.getFieldName(), formatInfo);
return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
@@ -170,7 +168,7 @@ public class FieldRelationUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
return fieldRelations;
}
@@ -190,7 +188,7 @@ public class FieldRelationUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
return fieldRelations;
}
@@ -209,7 +207,7 @@ public class FieldRelationUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !encryptFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap));
return fieldRelations;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index 6e96d7bfd..bd1b83bfb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -21,8 +21,8 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -71,12 +71,12 @@ import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
/**
* Util for load node info.
@@ -102,8 +102,7 @@ public class LoadNodeUtils {
List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
.map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(),
- streamSink.getSinkName(), constantFieldMap);
+ List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
Map<String, String> properties = streamSink.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
String sinkType = streamSink.getSinkType();
@@ -497,7 +496,7 @@ public class LoadNodeUtils {
/**
* Parse information field of data sink.
*/
- public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName,
+ public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
Map<String, StreamField> constantFieldMap) {
if (CollectionUtils.isEmpty(fieldList)) {
return Lists.newArrayList();
@@ -505,7 +504,7 @@ public class LoadNodeUtils {
return fieldList.stream()
.filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
.map(field -> {
- FieldInfo outputField = new FieldInfo(field.getFieldName(), sinkName,
+ FieldInfo outputField = new FieldInfo(field.getFieldName(),
FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
FunctionParam inputField;
String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 02722a9c5..3cc166601 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -338,17 +338,15 @@ public class FlinkSqlParser implements Parser {
// Generate mapping for output field to FieldRelation
fieldRelations.forEach(s -> {
// All field relations of input nodes will be the same if the node id of output field is blank.
- // Currently, the node id in the output file is used to distinguish which field of the node in the upstream
- // of the union the field comes from. A better way is through the upstream input field,
+ // Currently, the node id in the output field is used to distinguish which field of the node in the
+ // upstream of the union the field comes from. A better way is through the upstream input field,
// but this abstraction does not yet have the ability to set node ids for all upstream input fields.
// todo optimize the implementation of this block in the future
String nodeId = s.getOutputField().getNodeId();
if (StringUtils.isBlank(nodeId)) {
nodeId = unionRelation.getInputs().get(0);
}
- Map<String, FieldRelation> subRelationMap = fieldRelationMap
- .computeIfAbsent(nodeId, k -> new HashMap<>());
- subRelationMap.put(s.getOutputField().getName(), s);
+ fieldRelationMap.computeIfAbsent(nodeId, k -> new HashMap<>()).put(s.getOutputField().getName(), s);
});
StringBuilder sb = new StringBuilder();
sb.append(genUnionSingleSelectSql(unionRelation.getInputs().get(0),