You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/26 08:19:56 UTC
[inlong] branch release-1.3.0 updated: [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 5f8c997a0 [INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)
5f8c997a0 is described below
commit 5f8c997a02a098a885ba4c408b0270e6401d60b1
Author: woofyzhao <49...@qq.com>
AuthorDate: Mon Sep 26 16:12:36 2022 +0800
[INLONG-6003][Manager] Change the transform source node name to be MQ node in standard mode (#6010)
* Change the transform source node name to be MQ node in standard mode
* Reuse some params, and use an iterator to loop and modify the List
Co-authored-by: healchow <he...@gmail.com>
---
.../resource/sort/DefaultSortConfigOperator.java | 91 ++++++++++++++++++----
1 file changed, 74 insertions(+), 17 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 53ece64dc..6096777ac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -17,23 +17,22 @@
package org.apache.inlong.manager.service.resource.sort;
-import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
+import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
+import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.pojo.sort.util.ExtractNodeUtils;
-import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
-import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
@@ -47,8 +46,11 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -99,8 +101,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
// get sink info
Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
- List<TransformResponse> transformResponses = transformService.listTransform(groupInfo.getInlongGroupId(), null);
- Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
+ List<TransformResponse> transformList = transformService.listTransform(groupInfo.getInlongGroupId(), null);
+ Map<String, List<TransformResponse>> transformMap = transformList.stream()
.collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
Collectors.toCollection(ArrayList::new)));
@@ -111,35 +113,90 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
inlongStream.getSourceList().forEach(
source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
- List<TransformResponse> transformResponseList = transformMap.get(streamId);
- if (CollectionUtils.isNotEmpty(transformResponseList)) {
- transformResponseList.forEach(
+ List<TransformResponse> transformResponses = transformMap.get(streamId);
+ if (CollectionUtils.isNotEmpty(transformResponses)) {
+ transformResponses.forEach(
trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
}
// build a stream info from the nodes and relations
List<StreamSource> sources = sourceMap.get(streamId);
List<StreamSink> sinks = sinkMap.get(streamId);
- List<Node> nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap);
List<NodeRelation> relations;
- if (CollectionUtils.isEmpty(transformResponseList)) {
+ if (CollectionUtils.isEmpty(transformResponses)) {
relations = NodeRelationUtils.createNodeRelations(sources, sinks);
} else {
relations = NodeRelationUtils.createNodeRelations(inlongStream);
+ // in standard mode, replace upstream source node and transform input fields node to mq node
+ if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
+ // mq node name, which is inlong stream id
+ String mqNodeName = sources.get(0).getSourceName();
+ Set<String> nodeNameSet = getInputNodeNames(sources, transformResponses);
+ adjustTransformField(transformResponses, nodeNameSet, mqNodeName);
+ adjustNodeRelations(relations, nodeNameSet, mqNodeName);
+ }
}
+
+ // create extract-transform-load nodes
+ List<Node> nodes = this.createNodes(sources, transformResponses, sinks, fieldMap);
+
StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
sortStreamInfos.add(streamInfo);
- // rebuild joinerNode relation if transformResponseList is not empty
- NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
+ // rebuild joinerNode relation
+ NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponses);
}
return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
+ /**
+ * Deduplicate to get the node names of Source and Transform.
+ */
+ private Set<String> getInputNodeNames(List<StreamSource> sources, List<TransformResponse> transforms) {
+ Set<String> result = new HashSet<>();
+ if (CollectionUtils.isNotEmpty(sources)) {
+ result.addAll(sources.stream().map(StreamSource::getSourceName).collect(Collectors.toSet()));
+ }
+ if (CollectionUtils.isNotEmpty(transforms)) {
+ result.addAll(transforms.stream().map(TransformResponse::getTransformName).collect(Collectors.toSet()));
+ }
+ return result;
+ }
+
+ /**
+ * Set origin node to mq node for transform fields if necessary.
+ *
+ * In standard mode for InlongGroup, transform input node must either be
+ * mq source node or transform node, otherwise replace it with mq node name.
+ */
+ private void adjustTransformField(List<TransformResponse> transforms, Set<String> nodeNameSet, String mqNodeName) {
+ for (TransformResponse transform : transforms) {
+ for (StreamField field : transform.getFieldList()) {
+ if (!nodeNameSet.contains(field.getOriginNodeName())) {
+ field.setOriginNodeName(mqNodeName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Set the input node to MQ node for NodeRelations
+ */
+ private void adjustNodeRelations(List<NodeRelation> relations, Set<String> nodeNameSet, String mqNodeName) {
+ for (NodeRelation relation : relations) {
+ ListIterator<String> iterator = relation.getInputs().listIterator();
+ while (iterator.hasNext()) {
+ if (!nodeNameSet.contains(iterator.next())) {
+ iterator.set(mqNodeName);
+ }
+ }
+ }
+ }
+
private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
- List<Node> nodes = Lists.newArrayList();
+ List<Node> nodes = new ArrayList<>();
nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
@@ -170,7 +227,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
*/
private void addToGroupExt(InlongGroupInfo groupInfo, String value) {
if (groupInfo.getExtList() == null) {
- groupInfo.setExtList(Lists.newArrayList());
+ groupInfo.setExtList(new ArrayList<>());
}
InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
@@ -188,7 +245,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) {
streamInfos.forEach(streamInfo -> {
if (streamInfo.getExtList() == null) {
- streamInfo.setExtList(Lists.newArrayList());
+ streamInfo.setExtList(new ArrayList<>());
}
InlongStreamExtInfo extInfo = new InlongStreamExtInfo();