You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/25 12:25:24 UTC
[incubator-inlong] branch master updated: [INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new bd7da09aa [INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)
bd7da09aa is described below
commit bd7da09aadfe993e16f51e448176d01d5678273f
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Apr 25 20:25:18 2022 +0800
[INLONG-3929][Manager] Support deDuplicate transform in manager (#3933)
---
.../service/sort/util/TransformNodeUtils.java | 72 +++++++++++++++++++++-
1 file changed, 70 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 73f49e646..77e65ed35 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -19,9 +19,17 @@ package org.apache.inlong.manager.service.sort.util;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition.DeDuplicationStrategy;
+import org.apache.inlong.manager.common.util.StreamParseUtils;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
+import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import java.util.List;
import java.util.stream.Collectors;
@@ -31,7 +39,7 @@ import java.util.stream.Collectors;
*/
public class TransformNodeUtils {
- public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
+ public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
if (CollectionUtils.isEmpty(transformResponses)) {
return Lists.newArrayList();
}
@@ -41,6 +49,67 @@ public class TransformNodeUtils {
}
public static TransformNode createTransformNode(TransformResponse transformResponse) {
+ TransformType transformType = TransformType.forType(transformResponse.getTransformType());
+ if (transformType == TransformType.DE_DUPLICATION) {
+ TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
+ transformResponse.getTransformDefinition(), transformType);
+ return createDistinctNode((DeDuplicationDefinition) transformDefinition, transformResponse);
+ } else {
+ return createNormalTransformNode(transformResponse);
+ }
+ }
+
+ /**
+ * Create distinct node based on deDuplicationDefinition
+ *
+ * @param deDuplicationDefinition
+ * @param transformResponse
+ * @return
+ */
+ public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
+ TransformResponse transformResponse) {
+ String transformName = transformResponse.getTransformName();
+ List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
+ List<FieldInfo> distinctFields = streamFields.stream()
+ .map(streamField -> new FieldInfo(streamField.getFieldName(), transformName,
+ FieldInfoUtils.convertFieldFormat(streamField.getFieldType().name(),
+ streamField.getFieldFormat())))
+ .collect(Collectors.toList());
+ StreamField timingField = deDuplicationDefinition.getTimingField();
+ FieldInfo orderField = new FieldInfo(timingField.getFieldName(), transformName,
+ FieldInfoUtils.convertFieldFormat(timingField.getFieldType().name(), timingField.getFieldFormat()));
+ DeDuplicationStrategy deDuplicationStrategy = deDuplicationDefinition.getDeDuplicationStrategy();
+ OrderDirection orderDirection = null;
+ switch (deDuplicationStrategy) {
+ case RESERVE_LAST:
+ orderDirection = OrderDirection.DESC;
+ break;
+ case RESERVE_FIRST:
+ orderDirection = OrderDirection.ASC;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
+ }
+ TransformNode transformNode = createTransformNode(transformResponse);
+ return new DistinctNode(transformNode.getId(),
+ transformNode.getName(),
+ transformNode.getFields(),
+ transformNode.getFieldRelationShips(),
+ transformNode.getFilters(),
+ distinctFields,
+ orderField,
+ orderDirection);
+
+ }
+
+ /**
+ * Create transform node based on transformResponse
+ *
+ * @param transformResponse
+ * @return
+ */
+ public static TransformNode createNormalTransformNode(TransformResponse transformResponse) {
TransformNode transformNode = new TransformNode();
transformNode.setId(transformResponse.getTransformName());
transformNode.setName(transformResponse.getTransformName());
@@ -59,5 +128,4 @@ public class TransformNodeUtils {
FilterFunctionUtils.createFilterFunctions(transformResponse));
return transformNode;
}
-
}