You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/25 12:25:24 UTC

[incubator-inlong] branch master updated: [INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bd7da09aa [INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)
bd7da09aa is described below

commit bd7da09aadfe993e16f51e448176d01d5678273f
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Apr 25 20:25:18 2022 +0800

    [INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)
---
 .../service/sort/util/TransformNodeUtils.java      | 72 +++++++++++++++++++++-
 1 file changed, 70 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 73f49e646..77e65ed35 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -19,9 +19,17 @@ package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
 import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition.DeDuplicationStrategy;
+import org.apache.inlong.manager.common.util.StreamParseUtils;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
+import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,7 +39,7 @@ import java.util.stream.Collectors;
  */
 public class TransformNodeUtils {
 
-    public static List<TransformNode>  createTransformNodes(List<TransformResponse> transformResponses) {
+    public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
         if (CollectionUtils.isEmpty(transformResponses)) {
             return Lists.newArrayList();
         }
@@ -41,6 +49,67 @@ public class TransformNodeUtils {
     }
 
     public static TransformNode createTransformNode(TransformResponse transformResponse) {
+        TransformType transformType = TransformType.forType(transformResponse.getTransformType());
+        if (transformType == TransformType.DE_DUPLICATION) {
+            TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
+                    transformResponse.getTransformDefinition(), transformType);
+            return createDistinctNode((DeDuplicationDefinition) transformDefinition, transformResponse);
+        } else {
+            return createNormalTransformNode(transformResponse);
+        }
+    }
+
+    /**
+     * Create distinct node based on deDuplicationDefinition
+     *
+     * @param deDuplicationDefinition
+     * @param transformResponse
+     * @return
+     */
+    public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
+            TransformResponse transformResponse) {
+        String transformName = transformResponse.getTransformName();
+        List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
+        List<FieldInfo> distinctFields = streamFields.stream()
+                .map(streamField -> new FieldInfo(streamField.getFieldName(), transformName,
+                        FieldInfoUtils.convertFieldFormat(streamField.getFieldType().name(),
+                                streamField.getFieldFormat())))
+                .collect(Collectors.toList());
+        StreamField timingField = deDuplicationDefinition.getTimingField();
+        FieldInfo orderField = new FieldInfo(timingField.getFieldName(), transformName,
+                FieldInfoUtils.convertFieldFormat(timingField.getFieldType().name(), timingField.getFieldFormat()));
+        DeDuplicationStrategy deDuplicationStrategy = deDuplicationDefinition.getDeDuplicationStrategy();
+        OrderDirection orderDirection = null;
+        switch (deDuplicationStrategy) {
+            case RESERVE_LAST:
+                orderDirection = OrderDirection.DESC;
+                break;
+            case RESERVE_FIRST:
+                orderDirection = OrderDirection.ASC;
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
+        }
+        TransformNode transformNode = createTransformNode(transformResponse);
+        return new DistinctNode(transformNode.getId(),
+                transformNode.getName(),
+                transformNode.getFields(),
+                transformNode.getFieldRelationShips(),
+                transformNode.getFilters(),
+                distinctFields,
+                orderField,
+                orderDirection);
+
+    }
+
+    /**
+     * Create transform node based on transformResponse
+     *
+     * @param transformResponse
+     * @return
+     */
+    public static TransformNode createNormalTransformNode(TransformResponse transformResponse) {
         TransformNode transformNode = new TransformNode();
         transformNode.setId(transformResponse.getTransformName());
         transformNode.setName(transformResponse.getTransformName());
@@ -59,5 +128,4 @@ public class TransformNodeUtils {
                 FilterFunctionUtils.createFilterFunctions(transformResponse));
         return transformNode;
     }
-
 }