You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/29 10:27:45 UTC
[incubator-inlong] branch master updated: [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3298b72f4 [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
3298b72f4 is described below
commit 3298b72f4357eefab94df7f602db66551e6de51a
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Apr 29 18:27:40 2022 +0800
[INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
* Fix field type of StreamSourceFieldMapper
* change preNode sequence of joinDefinition
* change logic operator
---
.../java/org/apache/inlong/manager/common/enums/FieldType.java | 2 +-
.../inlong/manager/dao/entity/StreamSourceFieldEntity.java | 4 ++--
.../inlong/manager/dao/entity/StreamTransformFieldEntity.java | 4 ++--
.../inlong/manager/service/sort/util/NodeRelationShipUtils.java | 9 +++++----
.../manager/service/transform/StreamTransformServiceImpl.java | 7 ++++++-
5 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index c67df3f9a..735cf6b62 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,7 @@ public enum FieldType {
public static FieldType forName(String name) {
Preconditions.checkNotNull(name, "FieldType should not be null");
for (FieldType value : values()) {
- if (value.toString().equals(name) || value.toString().equals(name.toLowerCase(Locale.ROOT))) {
+ if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
return value;
}
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index a9d3f9f60..bd845d5d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -43,11 +43,11 @@ public class StreamSourceFieldEntity implements Serializable {
private String fieldComment;
- private Short isMetaField;
+ private Integer isMetaField;
private String fieldFormat;
- private Short rankNum;
+ private Integer rankNum;
private Integer isDeleted;
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index 9fe374b15..d88eea123 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -44,11 +44,11 @@ public class StreamTransformFieldEntity implements Serializable {
private String fieldComment;
- private Short isMetaField;
+ private Integer isMetaField;
private String fieldFormat;
- private Short rankNum;
+ private Integer rankNum;
private Integer isDeleted;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index ea004bbf1..177850240 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -119,6 +119,7 @@ public class NodeRelationShipUtils {
JoinMode joinMode = joinerDefinition.getJoinMode();
String leftNode = getNodeName(joinerDefinition.getLeftNode());
String rightNode = getNodeName(joinerDefinition.getRightNode());
+ List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
List<StreamField> leftJoinFields = joinerDefinition.getLeftJoinFields();
List<StreamField> rightJoinFields = joinerDefinition.getRightJoinFields();
List<FilterFunction> filterFunctions = Lists.newArrayList();
@@ -132,19 +133,19 @@ public class NodeRelationShipUtils {
operator = EmptyOperator.getInstance();
}
filterFunctions.add(
- createFilterFunction(leftField, rightField, leftNode, rightNode, AndOperator.getInstance()));
+ createFilterFunction(leftField, rightField, leftNode, rightNode, operator));
}
Map<String, List<FilterFunction>> joinConditions = Maps.newHashMap();
joinConditions.put(rightNode, filterFunctions);
switch (joinMode) {
case LEFT_JOIN:
- return new LeftOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+ return new LeftOuterJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
joinConditions);
case INNER_JOIN:
- return new RightOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+ return new RightOuterJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
joinConditions);
case RIGHT_JOIN:
- return new InnerJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+ return new InnerJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
joinConditions);
default:
throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 9d3a31b2a..24befb30d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -109,6 +110,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
.map(transformFieldEntity -> {
StreamField fieldInfo = CommonBeanUtils.copyProperties(transformFieldEntity,
StreamField::new);
+ fieldInfo.setFieldType(FieldType.forName(transformFieldEntity.getFieldType()));
+ fieldInfo.setId(Integer.valueOf(transformFieldEntity.getRankNum()));
return Pair.of(transformFieldEntity.getTransformId(), fieldInfo);
}).collect(Collectors.groupingBy(Pair::getLeft,
Collectors.mapping(Pair::getRight, Collectors.toList())));
@@ -216,12 +219,14 @@ public class StreamTransformServiceImpl implements StreamTransformService {
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
}
+ fieldEntity.setId(null);
fieldEntity.setInlongGroupId(groupId);
fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setFieldType(fieldInfo.getFieldType().name());
+ fieldEntity.setRankNum(fieldInfo.getId());
fieldEntity.setTransformId(transformId);
fieldEntity.setTransformType(transformType);
fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
- fieldEntity.setOriginNodeName(fieldInfo.getOriginNodeName());
entityList.add(fieldEntity);
}