You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/21 07:06:58 UTC
[inlong] branch master updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 27aa29f6b [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
27aa29f6b is described below
commit 27aa29f6bb1e60078b04e7dbb77d6a2234939da3
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 21 15:06:53 2022 +0800
[INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
---
.../resource/sort/DefaultSortConfigOperator.java | 67 +++++-----------------
1 file changed, 14 insertions(+), 53 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d9f8d37a7..53ece64dc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
return;
}
- GroupInfo configInfo;
- // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
- if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
- configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
- } else {
- configInfo = this.getStandardGroupInfo(groupInfo, streamInfos);
- }
-
+ GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
if (isStream) {
this.addToStreamExt(streamInfos, dataflow);
@@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
}
- private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
// get source info
Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
// get sink info
@@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
Map<String, StreamField> fieldMap = new HashMap<>();
inlongStream.getSourceList().forEach(
source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
+
List<TransformResponse> transformResponseList = transformMap.get(streamId);
if (CollectionUtils.isNotEmpty(transformResponseList)) {
transformResponseList.forEach(
@@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
// build a stream info from the nodes and relations
- List<Node> nodes = this.createNodesWithTransform(sourceMap.get(streamId),
- transformResponseList, sinkMap.get(streamId), fieldMap);
- List<NodeRelation> relations = NodeRelationUtils.createNodeRelations(inlongStream);
+ List<StreamSource> sources = sourceMap.get(streamId);
+ List<StreamSink> sinks = sinkMap.get(streamId);
+ List<Node> nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap);
+ List<NodeRelation> relations;
+ if (CollectionUtils.isEmpty(transformResponseList)) {
+ relations = NodeRelationUtils.createNodeRelations(sources, sinks);
+ } else {
+ relations = NodeRelationUtils.createNodeRelations(inlongStream);
+ }
StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
sortStreamInfos.add(streamInfo);
- // rebuild joinerNode relation
+ // rebuild joinerNode relation if transformResponseList is not empty
NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
}
return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
- /**
- * Get Sort GroupInfo of STANDARD inlong group.
- *
- * @see org.apache.inlong.sort.protocol.GroupInfo
- */
- private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- // get source info
- Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
- // get sink info
- Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
-
- // create StreamInfo for Sort protocol
- List<StreamInfo> sortStreamInfos = new ArrayList<>();
- for (InlongStreamInfo inlongStream : streamInfoList) {
- String streamId = inlongStream.getInlongStreamId();
-
- Map<String, StreamField> fieldMap = new HashMap<>();
- inlongStream.getSourceList().forEach(
- source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
-
- List<StreamSource> sources = sourceMap.get(streamId);
- List<StreamSink> sinks = sinkMap.get(streamId);
- StreamInfo sortStream = new StreamInfo(streamId,
- this.createNodesWithoutTransform(sources, sinks, fieldMap),
- NodeRelationUtils.createNodeRelations(sources, sinks));
- sortStreamInfos.add(sortStream);
- }
-
- return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
- }
-
- private List<Node> createNodesWithoutTransform(List<StreamSource> sources, List<StreamSink> sinks,
- Map<String, StreamField> constantFieldMap) {
- List<Node> nodes = Lists.newArrayList();
- nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
- nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
- return nodes;
- }
-
- private List<Node> createNodesWithTransform(List<StreamSource> sources, List<TransformResponse> transformResponses,
+ private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
List<Node> nodes = Lists.newArrayList();
nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));