You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/22 02:14:41 UTC
[inlong] branch master updated: [INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 390b161d4 [INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)
390b161d4 is described below
commit 390b161d4812d68ccae17546e889692eae82daa5
Author: healchow <he...@gmail.com>
AuthorDate: Fri Jul 22 10:14:37 2022 +0800
[INLONG-5157][Manager] Failed to build Sort config as the relations is empty (#5168)
---
.../service/sort/DefaultSortConfigOperator.java | 103 ++++++++++++++-------
.../manager/service/sort/util/LoadNodeUtils.java | 6 +-
.../service/sort/util/NodeRelationUtils.java | 18 +++-
3 files changed, 86 insertions(+), 41 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index 38cdf5880..0045dc8e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -82,7 +82,12 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
GroupInfo configInfo;
// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
- configInfo = getGroupInfo(groupInfo, streamInfos);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
+ } else {
+ configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
+ }
+
String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
if (isStream) {
this.addToStreamExt(streamInfos, dataflow);
@@ -95,14 +100,13 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
}
- private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
// get source info
Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
// get sink info
- String groupId = groupInfo.getInlongGroupId();
Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
- List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
+ List<TransformResponse> transformResponses = transformService.listTransform(groupInfo.getInlongGroupId(), null);
Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
.collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
Collectors.toCollection(ArrayList::new)));
@@ -110,17 +114,19 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
List<StreamInfo> sortStreamInfos = new ArrayList<>();
for (InlongStreamInfo inlongStream : streamInfoList) {
String streamId = inlongStream.getInlongStreamId();
- Map<String, StreamField> constantFieldMap = new HashMap<>();
- inlongStream.getSourceList()
- .forEach(s -> parseConstantFieldMap(s.getSourceName(), s.getFieldList(), constantFieldMap));
+ Map<String, StreamField> fieldMap = new HashMap<>();
+ inlongStream.getSourceList().forEach(
+ source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
List<TransformResponse> transformResponseList = transformMap.get(streamId);
if (CollectionUtils.isNotEmpty(transformResponseList)) {
- transformResponseList
- .forEach(s -> parseConstantFieldMap(s.getTransformName(), s.getFieldList(), constantFieldMap));
+ transformResponseList.forEach(
+ trans -> parseConstantFieldMap(trans.getTransformName(), trans.getFieldList(), fieldMap));
}
- List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
- transformResponseList, sinkMap.get(streamId), constantFieldMap);
- List<NodeRelation> relations = NodeRelationUtils.createNodeRelationsForStream(inlongStream);
+
+ // build a stream info from the nodes and relations
+ List<Node> nodes = this.createNodesWithTransform(sourceMap.get(streamId),
+ transformResponseList, sinkMap.get(streamId), fieldMap);
+ List<NodeRelation> relations = NodeRelationUtils.createNodeRelations(inlongStream);
StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
sortStreamInfos.add(streamInfo);
@@ -131,12 +137,60 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
+ /**
+ * Get Sort GroupInfo of normal inlong group.
+ *
+ * @see org.apache.inlong.sort.protocol.GroupInfo
+ */
+ private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ // get source info
+ Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+ // get sink info
+ Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
+ // create StreamInfo for Sort protocol
+ List<StreamInfo> sortStreamInfos = new ArrayList<>();
+ for (InlongStreamInfo inlongStream : streamInfoList) {
+ String streamId = inlongStream.getInlongStreamId();
+
+ Map<String, StreamField> fieldMap = new HashMap<>();
+ inlongStream.getSourceList().forEach(
+ source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap));
+
+ List<StreamSource> sources = sourceMap.get(streamId);
+ List<StreamSink> sinks = sinkMap.get(streamId);
+ StreamInfo sortStream = new StreamInfo(streamId,
+ this.createNodesWithoutTransform(sources, sinks, fieldMap),
+ NodeRelationUtils.createNodeRelations(sources, sinks));
+ sortStreamInfos.add(sortStream);
+ }
+
+ return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
+ }
+
+ private List<Node> createNodesWithoutTransform(List<StreamSource> sources, List<StreamSink> sinks,
+ Map<String, StreamField> constantFieldMap) {
+ List<Node> nodes = Lists.newArrayList();
+ nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+ return nodes;
+ }
+
+ private List<Node> createNodesWithTransform(List<StreamSource> sources, List<TransformResponse> transformResponses,
+ List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
+ List<Node> nodes = Lists.newArrayList();
+ nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
+ nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+ return nodes;
+ }
+
/**
* Get constant field from stream fields
*
- * @param nodeId The node id
- * @param fields The stream fields
- * @param constantFieldMap The constant field map
+ * @param nodeId node id
+ * @param fields stream fields
+ * @param constantFieldMap constant field map
*/
private void parseConstantFieldMap(String nodeId, List<StreamField> fields,
Map<String, StreamField> constantFieldMap) {
@@ -150,25 +204,6 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
}
- private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
- List<TransformResponse> transformResponses, List<StreamSink> streamSinks,
- Map<String, StreamField> constantFieldMap) {
- List<Node> nodes = Lists.newArrayList();
- nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
- nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
- nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks, constantFieldMap));
- return nodes;
- }
-
- private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
- NodeRelation relation = new NodeRelation();
- List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
- List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
- relation.setInputs(inputs);
- relation.setOutputs(outputs);
- return Lists.newArrayList(relation);
- }
-
/**
* Add config into inlong group ext info
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 49ad68231..88c88e6f6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -84,13 +84,13 @@ public class LoadNodeUtils {
/**
* Create nodes of data load.
*/
- public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks,
- Map<String, StreamField> constantFieldMap) {
+ public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks, Map<String, StreamField> fieldMap) {
if (CollectionUtils.isEmpty(streamSinks)) {
return Lists.newArrayList();
}
return streamSinks.stream()
- .map(s -> LoadNodeUtils.createLoadNode(s, constantFieldMap)).collect(Collectors.toList());
+ .map(sink -> LoadNodeUtils.createLoadNode(sink, fieldMap))
+ .collect(Collectors.toList());
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
index d5bafbcd5..5beb8ce69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
@@ -64,9 +64,8 @@ public class NodeRelationUtils {
/**
* Create node relation for the given stream
*/
- public static List<NodeRelation> createNodeRelationsForStream(InlongStreamInfo streamInfo) {
- String tempView = streamInfo.getExtParams();
- if (StringUtils.isEmpty(tempView)) {
+ public static List<NodeRelation> createNodeRelations(InlongStreamInfo streamInfo) {
+ if (StringUtils.isEmpty(streamInfo.getExtParams())) {
log.warn("stream node relation is empty for {}", streamInfo);
return Lists.newArrayList();
}
@@ -79,6 +78,18 @@ public class NodeRelationUtils {
.collect(Collectors.toList());
}
+ /**
+ * Create node relation from the given sources and sinks
+ */
+ public static List<NodeRelation> createNodeRelations(List<StreamSource> sources, List<StreamSink> sinks) {
+ NodeRelation relation = new NodeRelation();
+ List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
+ List<String> outputs = sinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
+ relation.setInputs(inputs);
+ relation.setOutputs(outputs);
+ return Lists.newArrayList(relation);
+ }
+
/**
* Optimize relation of node, JoinerRelation must be rebuilt.
*/
@@ -110,7 +121,6 @@ public class NodeRelationUtils {
String nodeName = outputs.get(0);
if (joinNodes.get(nodeName) != null) {
TransformDefinition transformDefinition = transformTypeMap.get(nodeName);
- TransformNode transformNode = joinNodes.get(nodeName);
joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition, relation));
shipIterator.remove();
}