You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/29 10:27:45 UTC

[incubator-inlong] branch master updated: [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3298b72f4  [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
3298b72f4 is described below

commit 3298b72f4357eefab94df7f602db66551e6de51a
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Apr 29 18:27:40 2022 +0800

     [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
    
    * Fix field type of StreamSourceFieldMapper
    
    * change preNode sequence of joinDefinition
    
    * change logic operator
---
 .../java/org/apache/inlong/manager/common/enums/FieldType.java   | 2 +-
 .../inlong/manager/dao/entity/StreamSourceFieldEntity.java       | 4 ++--
 .../inlong/manager/dao/entity/StreamTransformFieldEntity.java    | 4 ++--
 .../inlong/manager/service/sort/util/NodeRelationShipUtils.java  | 9 +++++----
 .../manager/service/transform/StreamTransformServiceImpl.java    | 7 ++++++-
 5 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index c67df3f9a..735cf6b62 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,7 @@ public enum FieldType {
     public static FieldType forName(String name) {
         Preconditions.checkNotNull(name, "FieldType should not be null");
         for (FieldType value : values()) {
-            if (value.toString().equals(name) || value.toString().equals(name.toLowerCase(Locale.ROOT))) {
+            if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
                 return value;
             }
         }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index a9d3f9f60..bd845d5d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -43,11 +43,11 @@ public class StreamSourceFieldEntity implements Serializable {
 
     private String fieldComment;
 
-    private Short isMetaField;
+    private Integer isMetaField;
 
     private String fieldFormat;
 
-    private Short rankNum;
+    private Integer rankNum;
 
     private Integer isDeleted;
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index 9fe374b15..d88eea123 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -44,11 +44,11 @@ public class StreamTransformFieldEntity implements Serializable {
 
     private String fieldComment;
 
-    private Short isMetaField;
+    private Integer isMetaField;
 
     private String fieldFormat;
 
-    private Short rankNum;
+    private Integer rankNum;
 
     private Integer isDeleted;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index ea004bbf1..177850240 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -119,6 +119,7 @@ public class NodeRelationShipUtils {
         JoinMode joinMode = joinerDefinition.getJoinMode();
         String leftNode = getNodeName(joinerDefinition.getLeftNode());
         String rightNode = getNodeName(joinerDefinition.getRightNode());
+        List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
         List<StreamField> leftJoinFields = joinerDefinition.getLeftJoinFields();
         List<StreamField> rightJoinFields = joinerDefinition.getRightJoinFields();
         List<FilterFunction> filterFunctions = Lists.newArrayList();
@@ -132,19 +133,19 @@ public class NodeRelationShipUtils {
                 operator = EmptyOperator.getInstance();
             }
             filterFunctions.add(
-                    createFilterFunction(leftField, rightField, leftNode, rightNode, AndOperator.getInstance()));
+                    createFilterFunction(leftField, rightField, leftNode, rightNode, operator));
         }
         Map<String, List<FilterFunction>> joinConditions = Maps.newHashMap();
         joinConditions.put(rightNode, filterFunctions);
         switch (joinMode) {
             case LEFT_JOIN:
-                return new LeftOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+                return new LeftOuterJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
                         joinConditions);
             case INNER_JOIN:
-                return new RightOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+                return new RightOuterJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
                         joinConditions);
             case RIGHT_JOIN:
-                return new InnerJoinNodeRelationShip(nodeRelationShip.getInputs(), nodeRelationShip.getOutputs(),
+                return new InnerJoinNodeRelationShip(preNodes, nodeRelationShip.getOutputs(),
                         joinConditions);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 9d3a31b2a..24befb30d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -109,6 +110,8 @@ public class StreamTransformServiceImpl implements StreamTransformService {
                 .map(transformFieldEntity -> {
                     StreamField fieldInfo = CommonBeanUtils.copyProperties(transformFieldEntity,
                             StreamField::new);
+                    fieldInfo.setFieldType(FieldType.forName(transformFieldEntity.getFieldType()));
+                    fieldInfo.setId(Integer.valueOf(transformFieldEntity.getRankNum()));
                     return Pair.of(transformFieldEntity.getTransformId(), fieldInfo);
                 }).collect(Collectors.groupingBy(Pair::getLeft,
                         Collectors.mapping(Pair::getRight, Collectors.toList())));
@@ -216,12 +219,14 @@ public class StreamTransformServiceImpl implements StreamTransformService {
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
             }
+            fieldEntity.setId(null);
             fieldEntity.setInlongGroupId(groupId);
             fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setFieldType(fieldInfo.getFieldType().name());
+            fieldEntity.setRankNum(fieldInfo.getId());
             fieldEntity.setTransformId(transformId);
             fieldEntity.setTransformType(transformType);
             fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
-            fieldEntity.setOriginNodeName(fieldInfo.getOriginNodeName());
             entityList.add(fieldEntity);
         }