You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/31 03:55:06 UTC
[incubator-inlong] branch master updated: [INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1f1cf8c0e [INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)
1f1cf8c0e is described below
commit 1f1cf8c0ec0c3c0cf9f464efa2466a8bd82eb83e
Author: healzhou <he...@gmail.com>
AuthorDate: Tue May 31 11:55:02 2022 +0800
[INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)
* Change the relationship to relation to adapt the Sort protocol
* Rename some variables and methods
---
.../manager/client/api/impl/InlongStreamImpl.java | 52 +++++++++++-----------
.../inlong/manager/common/auth/Authentication.java | 2 +-
.../inlong/manager/common/enums/GroupMode.java | 2 +-
.../inlong/manager/common/enums/MetaFieldType.java | 12 ++---
.../inlong/manager/common/enums/TransformType.java | 4 +-
.../manager/common/pojo/sort/BaseSortConf.java | 2 +-
.../common/pojo/source/kafka/KafkaOffset.java | 2 +-
...deRelationship.java => StreamNodeRelation.java} | 8 ++--
.../manager/common/pojo/stream/StreamPipeline.java | 16 +++----
.../pojo/transform/filter/FilterDefinition.java | 31 ++++++-------
.../transform/splitter/SplitterDefinition.java | 22 ++++-----
.../common/pojo/stream/StreamPipelineTest.java | 10 ++---
.../service/core/impl/ConsumptionServiceImpl.java | 4 +-
.../service/core/impl/InlongStreamServiceImpl.java | 19 ++++----
.../service/sink/StreamSinkServiceImpl.java | 2 -
.../service/sort/CreateSortConfigListenerV2.java | 14 +++---
.../sort/CreateStreamSortConfigListener.java | 5 ++-
.../service/sort/light/LightGroupSortListener.java | 8 ++--
.../service/sort/util/ExtractNodeUtils.java | 2 +-
.../service/sort/util/FieldRelationUtils.java | 16 +++----
.../service/sort/util/FilterFunctionUtils.java | 4 +-
.../manager/service/sort/util/LoadNodeUtils.java | 20 ++++-----
...lationShipUtils.java => NodeRelationUtils.java} | 43 +++++++++---------
.../service/sort/util/TransformNodeUtils.java | 4 +-
.../NewConsumptionWorkflowDefinition.java | 2 +-
.../workflow/group/NewGroupWorkflowDefinition.java | 2 +-
.../listener/GroupUpdateCompleteListener.java | 7 ++-
.../listener/StreamUpdateCompleteListener.java | 7 ++-
28 files changed, 154 insertions(+), 168 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 1b85538ce..f73d56a70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
-import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
+import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelation;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -213,28 +213,28 @@ public class InlongStreamImpl implements InlongStream {
public StreamPipeline createPipeline() {
StreamPipeline streamPipeline = new StreamPipeline();
if (MapUtils.isEmpty(streamTransforms)) {
- StreamNodeRelationship relationship = new StreamNodeRelationship();
- relationship.setInputNodes(streamSources.keySet());
- relationship.setOutputNodes(streamSinks.keySet());
- streamPipeline.setPipeline(Lists.newArrayList(relationship));
+ StreamNodeRelation relation = new StreamNodeRelation();
+ relation.setInputNodes(streamSources.keySet());
+ relation.setOutputNodes(streamSinks.keySet());
+ streamPipeline.setPipeline(Lists.newArrayList(relation));
return streamPipeline;
}
- Map<Set<String>, List<StreamNodeRelationship>> relationshipMap = Maps.newHashMap();
+ Map<Set<String>, List<StreamNodeRelation>> relationMap = Maps.newHashMap();
// Check preNodes
for (StreamTransform streamTransform : streamTransforms.values()) {
String transformName = streamTransform.getTransformName();
Set<String> preNodes = streamTransform.getPreNodes();
- StreamNodeRelationship relationship = new StreamNodeRelationship();
- relationship.setInputNodes(preNodes);
- relationship.setOutputNodes(Sets.newHashSet(transformName));
+ StreamNodeRelation relation = new StreamNodeRelation();
+ relation.setInputNodes(preNodes);
+ relation.setOutputNodes(Sets.newHashSet(transformName));
for (String preNode : preNodes) {
StreamTransform transform = streamTransforms.get(preNode);
if (transform != null) {
transform.addPost(transformName);
}
}
- relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+ relationMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relation);
}
// Check postNodes
for (StreamTransform streamTransform : streamTransforms.values()) {
@@ -248,29 +248,29 @@ public class InlongStreamImpl implements InlongStream {
}
}
if (CollectionUtils.isNotEmpty(sinkSet)) {
- StreamNodeRelationship relationship = new StreamNodeRelationship();
+ StreamNodeRelation relation = new StreamNodeRelation();
Set<String> preNodes = Sets.newHashSet(transformName);
- relationship.setInputNodes(preNodes);
- relationship.setOutputNodes(sinkSet);
- relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+ relation.setInputNodes(preNodes);
+ relation.setOutputNodes(sinkSet);
+ relationMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relation);
}
}
- List<StreamNodeRelationship> relationships = Lists.newArrayList();
- // Merge StreamNodeRelationship with same preNodes
- for (Map.Entry<Set<String>, List<StreamNodeRelationship>> entry : relationshipMap.entrySet()) {
- List<StreamNodeRelationship> unmergedRelationships = entry.getValue();
- if (unmergedRelationships.size() == 1) {
- relationships.add(unmergedRelationships.get(0));
+ List<StreamNodeRelation> relations = Lists.newArrayList();
+ // Merge StreamNodeRelation with same preNodes
+ for (Map.Entry<Set<String>, List<StreamNodeRelation>> entry : relationMap.entrySet()) {
+ List<StreamNodeRelation> unmergedRelations = entry.getValue();
+ if (unmergedRelations.size() == 1) {
+ relations.add(unmergedRelations.get(0));
} else {
- StreamNodeRelationship mergedRelationship = unmergedRelationships.get(0);
- for (int index = 1; index < unmergedRelationships.size(); index++) {
- StreamNodeRelationship unmergedRelationship = unmergedRelationships.get(index);
- unmergedRelationship.getOutputNodes().forEach(mergedRelationship::addOutputNode);
+ StreamNodeRelation mergedRelation = unmergedRelations.get(0);
+ for (int index = 1; index < unmergedRelations.size(); index++) {
+ StreamNodeRelation unmergedRelation = unmergedRelations.get(index);
+ unmergedRelation.getOutputNodes().forEach(mergedRelation::addOutputNode);
}
- relationships.add(mergedRelationship);
+ relations.add(mergedRelation);
}
}
- streamPipeline.setPipeline(relationships);
+ streamPipeline.setPipeline(relations);
Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
if (circleState.getLeft()) {
Pair<String, String> circleNodes = circleState.getRight();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
index 9691d44c1..6231f91f1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
@@ -42,7 +42,7 @@ public interface Authentication {
return authType;
}
}
- throw new IllegalArgumentException(String.format("Unsupported authType=%s for Inlong", type));
+ throw new IllegalArgumentException(String.format("Unsupported authType=%s", type));
}
@Override
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index 433791d18..1cd2cd414 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -49,7 +49,7 @@ public enum GroupMode {
return groupMode;
}
}
- throw new IllegalArgumentException(String.format("Unsupported group mode=%s for Inlong", mode));
+ throw new IllegalArgumentException(String.format("Unsupported group mode=%s", mode));
}
public static GroupMode parseGroupMode(InlongGroupInfo groupInfo) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
index 6e9d8f49f..3a52e41a0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
@@ -25,7 +25,7 @@ public enum MetaFieldType {
/**
* database
*/
- DATABASE("database", "meta field database used in canal json or mysql binlong and so on"),
+ DATABASE("database", "meta field database used in canal json or mysql binlog and so on"),
/**
* processing_time
@@ -35,27 +35,27 @@ public enum MetaFieldType {
/**
* data_time
*/
- DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlong and so on"),
+ DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlog and so on"),
/**
* table
*/
- TABLE("table", "meta field table used in canal json or mysql binlong and so on"),
+ TABLE("table", "meta field table used in canal json or mysql binlog and so on"),
/**
* event_time
*/
- EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlong and so on"),
+ EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlog and so on"),
/**
* is_ddl
*/
- IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlong and so on"),
+ IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlog and so on"),
/**
* event_type
*/
- EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlong and so on"),
+ EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlog and so on"),
/**
* data
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index b4d326b91..deaa62a74 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -49,7 +49,7 @@ public enum TransformType {
JOINER("joiner");
@Getter
- private String type;
+ private final String type;
TransformType(String type) {
this.type = type;
@@ -61,7 +61,7 @@ public enum TransformType {
return transformType;
}
}
- throw new IllegalArgumentException(String.format("Unsupported transform=%s for Inlong", type));
+ throw new IllegalArgumentException(String.format("Unsupported transformType=%s", type));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
index 3187aa6b7..361aff738 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
@@ -46,7 +46,7 @@ public abstract class BaseSortConf {
return sortType;
}
}
- throw new IllegalArgumentException(String.format("Unsupported type=%s for Inlong", type));
+ throw new IllegalArgumentException(String.format("Unsupported sortType=%s", type));
}
public String getType() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
index 539291947..1a66aa532 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
@@ -43,7 +43,7 @@ public enum KafkaOffset {
return dataFormat;
}
}
- throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s for Inlong", name));
+ throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s", name));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
similarity index 86%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
index 791edb987..bfbf38ca5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
@@ -24,20 +24,20 @@ import org.apache.inlong.manager.common.util.Preconditions;
import java.util.Set;
/**
- * Stream node relationship info, including input node name list, output node name list.
+ * Stream node relation info, including input node name list, output node name list.
*/
@Data
-public class StreamNodeRelationship {
+public class StreamNodeRelation {
private Set<String> inputNodes;
private Set<String> outputNodes;
- public StreamNodeRelationship() {
+ public StreamNodeRelation() {
this(Sets.newHashSet(), Sets.newHashSet());
}
- public StreamNodeRelationship(Set<String> inputNodes, Set<String> outputNodes) {
+ public StreamNodeRelation(Set<String> inputNodes, Set<String> outputNodes) {
this.inputNodes = inputNodes;
this.outputNodes = outputNodes;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
index 6efd61ec5..24d1a1cee 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
@@ -33,24 +33,24 @@ import java.util.Queue;
import java.util.Set;
/**
- * Stream pipeline, save stream node relationship list.
+ * Stream pipeline, save stream node relation list.
*/
@Data
public class StreamPipeline {
- private List<StreamNodeRelationship> pipeline;
+ private List<StreamNodeRelation> pipeline;
public StreamPipeline() {
this(Lists.newArrayList());
}
- public StreamPipeline(List<StreamNodeRelationship> pipeline) {
+ public StreamPipeline(List<StreamNodeRelation> pipeline) {
Preconditions.checkNotNull(pipeline, "Pipeline should not be null");
this.pipeline = pipeline;
}
- public void addRelationship(StreamNodeRelationship relationship) {
- pipeline.add(relationship);
+ public void addRelation(StreamNodeRelation relation) {
+ pipeline.add(relation);
}
/**
@@ -58,9 +58,9 @@ public class StreamPipeline {
*/
public Pair<Boolean, Pair<String, String>> hasCircle() {
Map<String, Set<String>> priorityMap = Maps.newHashMap();
- for (StreamNodeRelationship relationship : pipeline) {
- Set<String> inputNodes = relationship.getInputNodes();
- Set<String> outputNodes = relationship.getOutputNodes();
+ for (StreamNodeRelation relation : pipeline) {
+ Set<String> inputNodes = relation.getInputNodes();
+ Set<String> outputNodes = relation.getOutputNodes();
for (String inputNode : inputNodes) {
for (String outputNode : outputNodes) {
priorityMap.computeIfAbsent(inputNode, key -> Sets.newHashSet()).add(outputNode);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
index d01e2ee08..248347d15 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
@@ -29,12 +29,23 @@ import java.util.List;
/**
* A class to define operation to filter stream records by different modes.
- * Rule mode is more recommended then script mode
+ * Rule mode is more recommended than script mode.
*/
@Data
@EqualsAndHashCode(callSuper = true)
public class FilterDefinition extends TransformDefinition {
+ /**
+ * Strategy for Filter transform
+ */
+ private FilterStrategy filterStrategy;
+ /**
+ * Mode for Filter transform
+ */
+ private FilterMode filterMode;
+ private List<FilterRule> filterRules;
+ private ScriptBase scriptBase;
+
public FilterDefinition(FilterStrategy filterStrategy, List<FilterRule> filterRules) {
this.transformType = TransformType.FILTER;
this.filterStrategy = filterStrategy;
@@ -59,16 +70,6 @@ public class FilterDefinition extends TransformDefinition {
RULE, SCRIPT
}
- /**
- * Strategy for Filter transform
- */
- private FilterStrategy filterStrategy;
-
- /**
- * Mode for Filter transform
- */
- private FilterMode filterMode;
-
@Data
@AllArgsConstructor
public static class TargetValue {
@@ -84,8 +85,8 @@ public class FilterDefinition extends TransformDefinition {
}
/**
- * Filter rule is about relationship between sourceField and targetValue;
- * such as 'a >= b' or 'a is not null'
+ * Filter rule is about relation between sourceField and targetValue.
+ * Such as 'a >= b' or 'a were not null'
*/
@Data
@AllArgsConstructor
@@ -100,8 +101,6 @@ public class FilterDefinition extends TransformDefinition {
private RuleRelation relationWithPost;
}
- private List<FilterRule> filterRules;
-
@Data
@AllArgsConstructor
public static class ScriptBase {
@@ -110,6 +109,4 @@ public class FilterDefinition extends TransformDefinition {
private String script;
}
-
- private ScriptBase scriptBase;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
index bcaa33503..62f4fce3f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
@@ -31,10 +31,15 @@ import java.util.List;
* A class to define operation to split fields according to SplitRule defined.
*/
@Data
-@EqualsAndHashCode(callSuper = true)
@Builder
+@EqualsAndHashCode(callSuper = true)
public class SplitterDefinition extends TransformDefinition {
+ /**
+ * Split rules for transform;
+ */
+ private List<SplitRule> splitRules;
+
public SplitterDefinition(List<SplitRule> splitRules) {
this.transformType = TransformType.SPLITTER;
this.splitRules = splitRules;
@@ -42,31 +47,26 @@ public class SplitterDefinition extends TransformDefinition {
/**
* SplitterRule is aim to define a splitter action below:
- * SourceField will be splitted to targetFields by seperator
+ * SourceField will be split to targetFields by separator
*/
@Data
@AllArgsConstructor
public static class SplitRule {
/**
- * Field to split;
+ * Field to split
*/
private StreamField sourceField;
/**
- * String seperator to split sourceField;
+ * String separator to split sourceField
*/
- private String seperator;
+ private String separator;
/**
- * Fields generated when sourceField is splitted
+ * Fields generated when sourceField is split.
* Use sourceName_0, sourceName_1, sourceName_2 if not set
*/
private List<String> targetFields;
}
-
- /**
- * Split rules for transform;
- */
- private List<SplitRule> splitRules;
}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
index af29276b5..3e8d20b4e 100644
--- a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
@@ -30,11 +30,11 @@ public class StreamPipelineTest {
@Test
public void testCheckHasCircle() {
StreamPipeline streamPipeline = new StreamPipeline();
- streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
- streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("C"), Sets.newHashSet("D")));
- streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
- streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("F"), Sets.newHashSet("G")));
- streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
+ streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
+ streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("C"), Sets.newHashSet("D")));
+ streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
+ streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("F"), Sets.newHashSet("G")));
+ streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
Assert.assertTrue(circleState.getLeft());
Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getLeft()));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 0030ead9b..b6a7cf385 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -405,8 +405,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
// Tube’s topic is the inlong group level, one inlong group, one Tube topic
MQType mqType = MQType.forType(topicVO.getMqType());
if (mqType == MQType.TUBE) {
- String bizTopic = topicVO.getMqResource();
- Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
+ String mqResource = topicVO.getMqResource();
+ Preconditions.checkTrue(mqResource == null || mqResource.equals(info.getTopic()),
"topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
} else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
// Pulsar's topic is the inlong stream level.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 3ca608413..03da93075 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -104,7 +104,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be added
- checkBizIsTempStatus(groupId);
+ checkGroupStatusIsTemp(groupId);
// The streamId under the same groupId cannot be repeated
Integer count = streamMapper.selectExistByIdentifier(groupId, streamId);
@@ -170,7 +170,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
.collect(Collectors.groupingBy(InlongStreamExtInfo::getInlongStreamId,
HashMap::new,
Collectors.toCollection(ArrayList::new)));
- streamList.stream().forEach(streamInfo -> {
+ streamList.forEach(streamInfo -> {
String streamId = streamInfo.getInlongStreamId();
List<StreamField> fieldInfos = streamFieldMap.get(streamId);
streamInfo.setFieldList(fieldInfos);
@@ -246,7 +246,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be modified
- InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
+ InlongGroupEntity inlongGroupEntity = this.checkGroupStatusIsTemp(groupId);
// Make sure the stream was exists
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
@@ -280,7 +280,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- this.checkBizIsTempStatus(groupId);
+ this.checkGroupStatusIsTemp(groupId);
InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, streamId);
if (entity == null) {
@@ -322,7 +322,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- this.checkBizIsTempStatus(groupId);
+ this.checkGroupStatusIsTemp(groupId);
List<InlongStreamEntity> entityList = streamMapper.selectByGroupId(groupId);
if (CollectionUtils.isEmpty(entityList)) {
@@ -377,9 +377,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
- // Check whether it can be added: check by lower-level specific services
- // this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
-
// Save inlong stream
save(streamRequest, operator);
@@ -413,7 +410,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
String groupId = firstStream.getInlongGroupId();
- this.checkBizIsTempStatus(groupId);
+ this.checkGroupStatusIsTemp(groupId);
// This bulk save is only used when creating or editing inlong group after approval is rejected.
// To ensure data consistency, you need to physically delete all associated data and then add
@@ -627,7 +624,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
- entityList.stream().forEach(streamEntity -> {
+ entityList.forEach(streamEntity -> {
streamEntity.setInlongGroupId(groupId);
streamEntity.setInlongStreamId(streamId);
});
@@ -641,7 +638,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
* @param groupId inlong group id
* @return inlong group entity
*/
- private InlongGroupEntity checkBizIsTempStatus(String groupId) {
+ private InlongGroupEntity checkGroupStatusIsTemp(String groupId) {
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
Preconditions.checkNotNull(entity, "groupId is invalid");
// Add/modify/delete is not allowed under certain inlong group status
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index bdf3168f9..c99c94b3b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -226,8 +226,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
public Boolean delete(Integer id, String operator) {
LOGGER.info("begin to delete sink by id={}", id);
Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
- // Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
-
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index ce26c9140..262a0e7e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -47,6 +47,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.springframework.beans.factory.annotation.Autowired;
@@ -127,7 +128,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
List<InlongStreamInfo> streamInfoList) {
MQType mqType = MQType.forType(groupInfo.getMqType());
if (mqType != MQType.PULSAR) {
- String errMsg = String.format("Unsupported MqType={%s} for Inlong", mqType);
+ String errMsg = String.format("Unsupported mqType={%s}", mqType);
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
@@ -152,7 +153,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
}
}
- pulsarSource.setScanStartupMode("earliest");
+ pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
.add(pulsarSource);
@@ -167,13 +168,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
return nodes;
}
- private List<NodeRelation> createNodeRelationsForStream(
- List<StreamSource> sources, List<StreamSink> streamSinks) {
+ private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
NodeRelation relation = new NodeRelation();
- List<String> inputs = sources.stream().map(StreamSource::getSourceName)
- .collect(Collectors.toList());
- List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName)
- .collect(Collectors.toList());
+ List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
+ List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
relation.setInputs(inputs);
relation.setOutputs(outputs);
return Lists.newArrayList(relation);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 27e3b141a..02e848de2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -47,6 +47,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.springframework.beans.factory.annotation.Autowired;
@@ -120,7 +121,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
MQType mqType = MQType.forType(groupInfo.getMqType());
if (mqType != MQType.PULSAR) {
- String errMsg = String.format("Unsupported MqType={%s} for Inlong", mqType);
+ String errMsg = String.format("Unsupported mqType={%s}", mqType);
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
@@ -143,7 +144,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
pulsarSource.setPrimaryKey(((KafkaSource) source).getPrimaryKey());
}
}
- pulsarSource.setScanStartupMode("earliest");
+ pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
return Lists.newArrayList(pulsarSource);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index e018d69b8..6308b70cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.service.sort.util.NodeRelationShipUtils;
+import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.transform.StreamTransformService;
@@ -122,12 +122,12 @@ public class LightGroupSortListener implements SortOperateListener {
List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
transformMap.get(streamId), sinkMap.get(streamId));
StreamInfo streamInfo = new StreamInfo(streamId, nodes,
- NodeRelationShipUtils.createNodeRelationShipsForStream(stream));
+ NodeRelationUtils.createNodeRelationsForStream(stream));
streamInfos.add(streamInfo);
- // Rebuild joinerNode relationship
+ // Rebuild joinerNode relation
List<TransformResponse> transformResponseList = transformMap.get(streamId);
- NodeRelationShipUtils.optimizeNodeRelationShips(streamInfo, transformResponseList);
+ NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
}
return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 83cccc84e..9447b156a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -180,7 +180,7 @@ public class ExtractNodeUtils {
throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
}
KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
- KafkaScanStartupMode startupMode = null;
+ KafkaScanStartupMode startupMode;
switch (kafkaOffset) {
case EARLIEST:
startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 301914e73..5793db886 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -77,7 +77,7 @@ public class FieldRelationUtils {
return createJoinerFieldRelations(fieldList, transformName);
default:
throw new UnsupportedOperationException(
- String.format("Unsupported transformType=%s for Inlong", transformType));
+ String.format("Unsupported transformType=%s", transformType));
}
}
@@ -99,8 +99,7 @@ public class FieldRelationUtils {
/**
* Create relation of fields in join function.
*/
- private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
- String transformName) {
+ private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName) {
return fieldList.stream()
.map(streamField -> {
FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
@@ -139,8 +138,7 @@ public class FieldRelationUtils {
/**
* Create relation of fields in replace function.
*/
- private static List<FieldRelation> createReplacerFieldRelations(
- List<StreamField> fieldList, String transformName,
+ private static List<FieldRelation> createReplacerFieldRelations(List<StreamField> fieldList, String transformName,
StringReplacerDefinition replacerDefinition, String preNodes) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
String preNode = preNodes.split(",")[0];
@@ -183,7 +181,7 @@ public class FieldRelationUtils {
* Parse rule of replacer.
*/
private static FieldRelation parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
- String transformName, String preNode) {
+ String transformName, String preNode) {
StreamField sourceField = replaceRule.getSourceField();
final String fieldName = sourceField.getFieldName();
String regex = replaceRule.getRegex();
@@ -209,16 +207,16 @@ public class FieldRelationUtils {
* Parse rule of split.
*/
private static List<FieldRelation> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
- String transformName, String preNode) {
+ String transformName, String preNode) {
StreamField sourceField = splitRule.getSourceField();
FieldInfo fieldInfo = FieldInfoUtils.parseStreamField(sourceField);
fieldInfo.setNodeId(preNode);
- String seperator = splitRule.getSeperator();
+ String separator = splitRule.getSeparator();
List<String> targetSources = splitRule.getTargetFields();
List<FieldRelation> splitRelations = Lists.newArrayList();
for (int index = 0; index < targetSources.size(); index++) {
SplitIndexFunction splitIndexFunction = new SplitIndexFunction(
- fieldInfo, new StringConstantParam(seperator), new ConstantParam(index));
+ fieldInfo, new StringConstantParam(separator), new ConstantParam(index));
FieldInfo targetFieldInfo = new FieldInfo(
targetSources.get(index), transformName, FieldInfoUtils.convertFieldFormat(FieldType.STRING.name())
);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index 2bc91662e..53c5dbfee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -77,7 +77,7 @@ public class FilterFunctionUtils {
return Lists.newArrayList();
default:
throw new UnsupportedOperationException(
- String.format("Unsupported transformType=%s for Inlong", transformType));
+ String.format("Unsupported transformType=%s", transformType));
}
}
@@ -129,7 +129,7 @@ public class FilterFunctionUtils {
return null;
default:
throw new UnsupportedOperationException(
- String.format("Unsupported transformType=%s for Inlong", transformType));
+ String.format("Unsupported transformType=%s", transformType));
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index dd8191703..ffa9054cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -97,7 +97,7 @@ public class LoadNodeUtils {
List<FieldInfo> fieldInfos = fieldList.stream()
.map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
Map<String, String> properties = kafkaSink.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
Integer sinkParallelism = null;
@@ -129,7 +129,7 @@ public class LoadNodeUtils {
return new KafkaLoadNode(id,
name,
fieldInfos,
- fieldRelationships,
+ fieldRelations,
Lists.newArrayList(),
null,
topicName,
@@ -154,7 +154,7 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
Map<String, String> properties = hiveSink.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
List<FieldInfo> partitionFields = Lists.newArrayList();
@@ -168,7 +168,7 @@ public class LoadNodeUtils {
id,
name,
fields,
- fieldRelationships,
+ fieldRelations,
Lists.newArrayList(),
null,
null,
@@ -193,14 +193,14 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
Map<String, String> properties = hbaseSink.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
return new HbaseLoadNode(
id,
name,
fields,
- fieldRelationships,
+ fieldRelations,
Lists.newArrayList(),
null,
null,
@@ -225,10 +225,10 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
return new PostgresLoadNode(postgresSink.getSinkName(),
postgresSink.getSinkName(),
- fields, fieldRelationships, null, null, 1,
+ fields, fieldRelations, null, null, 1,
null, postgresSink.getJdbcUrl(), postgresSink.getUsername(),
postgresSink.getPassword(),
postgresSink.getDbName() + "." + postgresSink.getTableName(),
@@ -244,9 +244,9 @@ public class LoadNodeUtils {
List<FieldInfo> fields = sinkFields.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
+ List<FieldRelation> fieldRelations = parseSinkFields(sinkFields, name);
return new ClickHouseLoadNode(name, name,
- fields, fieldRelationShips, null, null, 1,
+ fields, fieldRelations, null, null, 1,
null, ckSink.getTableName(),
ckSink.getJdbcUrl(),
ckSink.getUsername(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
index b9791ea64..d5bafbcd5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
@@ -56,33 +56,33 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
- * Util for creat node relationship.
+ * Util for create node relation.
*/
@Slf4j
-public class NodeRelationShipUtils {
+public class NodeRelationUtils {
/**
- * Create node relationship for the given stream
+ * Create node relation for the given stream
*/
- public static List<NodeRelation> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
+ public static List<NodeRelation> createNodeRelationsForStream(InlongStreamInfo streamInfo) {
String tempView = streamInfo.getExtParams();
if (StringUtils.isEmpty(tempView)) {
- log.warn("StreamNodeRelationShip is empty for Stream={}", streamInfo);
+ log.warn("stream node relation is empty for {}", streamInfo);
return Lists.newArrayList();
}
StreamPipeline pipeline = StreamParseUtils.parseStreamPipeline(streamInfo.getExtParams(),
streamInfo.getInlongStreamId());
return pipeline.getPipeline().stream()
- .map(nodeRelationship -> new NodeRelation(
- Lists.newArrayList(nodeRelationship.getInputNodes()),
- Lists.newArrayList(nodeRelationship.getOutputNodes())))
+ .map(nodeRelation -> new NodeRelation(
+ Lists.newArrayList(nodeRelation.getInputNodes()),
+ Lists.newArrayList(nodeRelation.getOutputNodes())))
.collect(Collectors.toList());
}
/**
- * Optimize relationship of node, JoinerRelationship must be rebuilt.
+ * Optimize relation of node, JoinerRelation must be rebuilt.
*/
- public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
+ public static void optimizeNodeRelation(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
if (CollectionUtils.isEmpty(transformResponses)) {
return;
}
@@ -100,27 +100,26 @@ public class NodeRelationShipUtils {
return transformDefinition.getTransformType() == TransformType.JOINER;
}).collect(Collectors.toMap(TransformNode::getName, transformNode -> transformNode));
- List<NodeRelation> relationships = streamInfo.getRelations();
- Iterator<NodeRelation> shipIterator = relationships.listIterator();
- List<NodeRelation> joinRelationships = Lists.newArrayList();
+ List<NodeRelation> relations = streamInfo.getRelations();
+ Iterator<NodeRelation> shipIterator = relations.listIterator();
+ List<NodeRelation> joinRelations = Lists.newArrayList();
while (shipIterator.hasNext()) {
- NodeRelation relationship = shipIterator.next();
- List<String> outputs = relationship.getOutputs();
+ NodeRelation relation = shipIterator.next();
+ List<String> outputs = relation.getOutputs();
if (outputs.size() == 1) {
String nodeName = outputs.get(0);
if (joinNodes.get(nodeName) != null) {
TransformDefinition transformDefinition = transformTypeMap.get(nodeName);
TransformNode transformNode = joinNodes.get(nodeName);
- joinRelationships.add(createNodeRelationShip((JoinerDefinition) transformDefinition, relationship));
+ joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition, relation));
shipIterator.remove();
}
}
}
- relationships.addAll(joinRelationships);
+ relations.addAll(joinRelations);
}
- private static NodeRelation createNodeRelationShip(JoinerDefinition joinerDefinition,
- NodeRelation nodeRelationship) {
+ private static NodeRelation getNodeRelation(JoinerDefinition joinerDefinition, NodeRelation nodeRelation) {
JoinMode joinMode = joinerDefinition.getJoinMode();
String leftNode = getNodeName(joinerDefinition.getLeftNode());
String rightNode = getNodeName(joinerDefinition.getRightNode());
@@ -143,11 +142,11 @@ public class NodeRelationShipUtils {
joinConditions.put(rightNode, filterFunctions);
switch (joinMode) {
case LEFT_JOIN:
- return new LeftOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new LeftOuterJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
case INNER_JOIN:
- return new InnerJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new InnerJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
case RIGHT_JOIN:
- return new RightOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new RightOuterJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
default:
throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 4f480ac32..cbf831cab 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -63,7 +63,7 @@ public class TransformNodeUtils {
* Create distinct node based on deDuplicationDefinition
*/
public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
- TransformResponse transformResponse) {
+ TransformResponse transformResponse) {
List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
List<FieldInfo> distinctFields = streamFields.stream()
.map(FieldInfoUtils::parseStreamField)
@@ -71,7 +71,7 @@ public class TransformNodeUtils {
StreamField timingField = deDuplicationDefinition.getTimingField();
FieldInfo orderField = FieldInfoUtils.parseStreamField(timingField);
DeDuplicationStrategy deDuplicationStrategy = deDuplicationDefinition.getDeDuplicationStrategy();
- OrderDirection orderDirection = null;
+ OrderDirection orderDirection;
switch (deDuplicationStrategy) {
case RESERVE_LAST:
orderDirection = OrderDirection.DESC;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
index 0c23c4314..e237344d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
@@ -105,7 +105,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
adminUserTask.addListener(consumptionPassTaskListener);
process.addTask(adminUserTask);
- // Set order relationship
+ // Set order relation
startEvent.addNext(groupOwnerUserTask);
groupOwnerUserTask.addNext(adminUserTask);
adminUserTask.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
index 245d1a5e4..eee0adc93 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
@@ -87,7 +87,7 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
adminUserTask.addListener(groupAfterApprovedListener);
process.addTask(adminUserTask);
- // Configuration order relationship
+ // Configuration order relation
startEvent.addNext(adminUserTask);
// If you need another approval process, you can add it here
adminUserTask.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
index 974520e32..0e35ff280 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
@@ -49,10 +49,10 @@ public class GroupUpdateCompleteListener implements ProcessEventListener {
public ListenerResult listen(WorkflowContext context) throws Exception {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String operator = context.getOperator();
- GroupOperateType groupOperateType = form.getGroupOperateType();
+ GroupOperateType operateType = form.getGroupOperateType();
InlongGroupInfo groupInfo = form.getGroupInfo();
Integer nextStatus;
- switch (groupOperateType) {
+ switch (operateType) {
case RESTART:
nextStatus = GroupStatus.RESTARTED.getCode();
break;
@@ -63,8 +63,7 @@ public class GroupUpdateCompleteListener implements ProcessEventListener {
nextStatus = GroupStatus.DELETED.getCode();
break;
default:
- throw new RuntimeException(
- String.format("Unsupported operation=%s for Inlong group", groupOperateType));
+ throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
}
// Update inlong group status and other info
groupService.updateStatus(groupInfo.getInlongGroupId(), nextStatus, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
index 1b8689a1e..33312397b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
@@ -50,11 +50,11 @@ public class StreamUpdateCompleteListener implements ProcessEventListener {
StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
final InlongStreamInfo streamInfo = form.getStreamInfo();
final String operator = context.getOperator();
- final GroupOperateType groupOperateType = form.getGroupOperateType();
+ final GroupOperateType operateType = form.getGroupOperateType();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
StreamStatus status;
- switch (groupOperateType) {
+ switch (operateType) {
case RESTART:
status = StreamStatus.RESTARTED;
break;
@@ -65,8 +65,7 @@ public class StreamUpdateCompleteListener implements ProcessEventListener {
status = StreamStatus.DELETED;
break;
default:
- throw new RuntimeException(
- String.format("Unsupported operation=%s for Inlong group", groupOperateType));
+ throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
}
streamService.updateStatus(groupId, streamId, status.getCode(), operator);
streamService.update(streamInfo.genRequest(), operator);