You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/31 03:55:06 UTC

[incubator-inlong] branch master updated: [INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f1cf8c0e [INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)
1f1cf8c0e is described below

commit 1f1cf8c0ec0c3c0cf9f464efa2466a8bd82eb83e
Author: healzhou <he...@gmail.com>
AuthorDate: Tue May 31 11:55:02 2022 +0800

    [INLONG-4445][Manager] Change the relationship to relation to adapt the Sort protocol (#4446)
    
    * Change the relationship to relation to adapt the Sort protocol
    
    * Rename some variables and methods
---
 .../manager/client/api/impl/InlongStreamImpl.java  | 52 +++++++++++-----------
 .../inlong/manager/common/auth/Authentication.java |  2 +-
 .../inlong/manager/common/enums/GroupMode.java     |  2 +-
 .../inlong/manager/common/enums/MetaFieldType.java | 12 ++---
 .../inlong/manager/common/enums/TransformType.java |  4 +-
 .../manager/common/pojo/sort/BaseSortConf.java     |  2 +-
 .../common/pojo/source/kafka/KafkaOffset.java      |  2 +-
 ...deRelationship.java => StreamNodeRelation.java} |  8 ++--
 .../manager/common/pojo/stream/StreamPipeline.java | 16 +++----
 .../pojo/transform/filter/FilterDefinition.java    | 31 ++++++-------
 .../transform/splitter/SplitterDefinition.java     | 22 ++++-----
 .../common/pojo/stream/StreamPipelineTest.java     | 10 ++---
 .../service/core/impl/ConsumptionServiceImpl.java  |  4 +-
 .../service/core/impl/InlongStreamServiceImpl.java | 19 ++++----
 .../service/sink/StreamSinkServiceImpl.java        |  2 -
 .../service/sort/CreateSortConfigListenerV2.java   | 14 +++---
 .../sort/CreateStreamSortConfigListener.java       |  5 ++-
 .../service/sort/light/LightGroupSortListener.java |  8 ++--
 .../service/sort/util/ExtractNodeUtils.java        |  2 +-
 .../service/sort/util/FieldRelationUtils.java      | 16 +++----
 .../service/sort/util/FilterFunctionUtils.java     |  4 +-
 .../manager/service/sort/util/LoadNodeUtils.java   | 20 ++++-----
 ...lationShipUtils.java => NodeRelationUtils.java} | 43 +++++++++---------
 .../service/sort/util/TransformNodeUtils.java      |  4 +-
 .../NewConsumptionWorkflowDefinition.java          |  2 +-
 .../workflow/group/NewGroupWorkflowDefinition.java |  2 +-
 .../listener/GroupUpdateCompleteListener.java      |  7 ++-
 .../listener/StreamUpdateCompleteListener.java     |  7 ++-
 28 files changed, 154 insertions(+), 168 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 1b85538ce..f73d56a70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
-import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
+import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelation;
 import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
 import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
 import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -213,28 +213,28 @@ public class InlongStreamImpl implements InlongStream {
     public StreamPipeline createPipeline() {
         StreamPipeline streamPipeline = new StreamPipeline();
         if (MapUtils.isEmpty(streamTransforms)) {
-            StreamNodeRelationship relationship = new StreamNodeRelationship();
-            relationship.setInputNodes(streamSources.keySet());
-            relationship.setOutputNodes(streamSinks.keySet());
-            streamPipeline.setPipeline(Lists.newArrayList(relationship));
+            StreamNodeRelation relation = new StreamNodeRelation();
+            relation.setInputNodes(streamSources.keySet());
+            relation.setOutputNodes(streamSinks.keySet());
+            streamPipeline.setPipeline(Lists.newArrayList(relation));
             return streamPipeline;
         }
 
-        Map<Set<String>, List<StreamNodeRelationship>> relationshipMap = Maps.newHashMap();
+        Map<Set<String>, List<StreamNodeRelation>> relationMap = Maps.newHashMap();
         // Check preNodes
         for (StreamTransform streamTransform : streamTransforms.values()) {
             String transformName = streamTransform.getTransformName();
             Set<String> preNodes = streamTransform.getPreNodes();
-            StreamNodeRelationship relationship = new StreamNodeRelationship();
-            relationship.setInputNodes(preNodes);
-            relationship.setOutputNodes(Sets.newHashSet(transformName));
+            StreamNodeRelation relation = new StreamNodeRelation();
+            relation.setInputNodes(preNodes);
+            relation.setOutputNodes(Sets.newHashSet(transformName));
             for (String preNode : preNodes) {
                 StreamTransform transform = streamTransforms.get(preNode);
                 if (transform != null) {
                     transform.addPost(transformName);
                 }
             }
-            relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+            relationMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relation);
         }
         // Check postNodes
         for (StreamTransform streamTransform : streamTransforms.values()) {
@@ -248,29 +248,29 @@ public class InlongStreamImpl implements InlongStream {
                 }
             }
             if (CollectionUtils.isNotEmpty(sinkSet)) {
-                StreamNodeRelationship relationship = new StreamNodeRelationship();
+                StreamNodeRelation relation = new StreamNodeRelation();
                 Set<String> preNodes = Sets.newHashSet(transformName);
-                relationship.setInputNodes(preNodes);
-                relationship.setOutputNodes(sinkSet);
-                relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+                relation.setInputNodes(preNodes);
+                relation.setOutputNodes(sinkSet);
+                relationMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relation);
             }
         }
-        List<StreamNodeRelationship> relationships = Lists.newArrayList();
-        // Merge StreamNodeRelationship with same preNodes
-        for (Map.Entry<Set<String>, List<StreamNodeRelationship>> entry : relationshipMap.entrySet()) {
-            List<StreamNodeRelationship> unmergedRelationships = entry.getValue();
-            if (unmergedRelationships.size() == 1) {
-                relationships.add(unmergedRelationships.get(0));
+        List<StreamNodeRelation> relations = Lists.newArrayList();
+        // Merge StreamNodeRelation with same preNodes
+        for (Map.Entry<Set<String>, List<StreamNodeRelation>> entry : relationMap.entrySet()) {
+            List<StreamNodeRelation> unmergedRelations = entry.getValue();
+            if (unmergedRelations.size() == 1) {
+                relations.add(unmergedRelations.get(0));
             } else {
-                StreamNodeRelationship mergedRelationship = unmergedRelationships.get(0);
-                for (int index = 1; index < unmergedRelationships.size(); index++) {
-                    StreamNodeRelationship unmergedRelationship = unmergedRelationships.get(index);
-                    unmergedRelationship.getOutputNodes().forEach(mergedRelationship::addOutputNode);
+                StreamNodeRelation mergedRelation = unmergedRelations.get(0);
+                for (int index = 1; index < unmergedRelations.size(); index++) {
+                    StreamNodeRelation unmergedRelation = unmergedRelations.get(index);
+                    unmergedRelation.getOutputNodes().forEach(mergedRelation::addOutputNode);
                 }
-                relationships.add(mergedRelationship);
+                relations.add(mergedRelation);
             }
         }
-        streamPipeline.setPipeline(relationships);
+        streamPipeline.setPipeline(relations);
         Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
         if (circleState.getLeft()) {
             Pair<String, String> circleNodes = circleState.getRight();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
index 9691d44c1..6231f91f1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/auth/Authentication.java
@@ -42,7 +42,7 @@ public interface Authentication {
                     return authType;
                 }
             }
-            throw new IllegalArgumentException(String.format("Unsupported authType=%s for Inlong", type));
+            throw new IllegalArgumentException(String.format("Unsupported authType=%s", type));
         }
 
         @Override
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index 433791d18..1cd2cd414 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -49,7 +49,7 @@ public enum GroupMode {
                 return groupMode;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupported group mode=%s for Inlong", mode));
+        throw new IllegalArgumentException(String.format("Unsupported group mode=%s", mode));
     }
 
     public static GroupMode parseGroupMode(InlongGroupInfo groupInfo) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
index 6e9d8f49f..3a52e41a0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MetaFieldType.java
@@ -25,7 +25,7 @@ public enum MetaFieldType {
     /**
      * database
      */
-    DATABASE("database", "meta field database used in canal json or mysql binlong and so on"),
+    DATABASE("database", "meta field database used in canal json or mysql binlog and so on"),
 
     /**
      * processing_time
@@ -35,27 +35,27 @@ public enum MetaFieldType {
     /**
      * data_time
      */
-    DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlong and so on"),
+    DATA_TIME("data_time", "meta field data_time used in canal json or mysql binlog and so on"),
 
     /**
      * table
      */
-    TABLE("table", "meta field table used in canal json or mysql binlong and so on"),
+    TABLE("table", "meta field table used in canal json or mysql binlog and so on"),
 
     /**
      * event_time
      */
-    EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlong and so on"),
+    EVENT_TIME("event_time", "meta field event_time used in canal json or mysql binlog and so on"),
 
     /**
      * is_ddl
      */
-    IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlong and so on"),
+    IS_DDL("is_ddl", "meta field is_ddl used in canal json or mysql binlog and so on"),
 
     /**
      * event_type
      */
-    EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlong and so on"),
+    EVENT_TYPE("event_type", "meta field event_type used in canal json or mysql binlog and so on"),
 
     /**
      * data
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index b4d326b91..deaa62a74 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -49,7 +49,7 @@ public enum TransformType {
     JOINER("joiner");
 
     @Getter
-    private String type;
+    private final String type;
 
     TransformType(String type) {
         this.type = type;
@@ -61,7 +61,7 @@ public enum TransformType {
                 return transformType;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupported transform=%s for Inlong", type));
+        throw new IllegalArgumentException(String.format("Unsupported transformType=%s", type));
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
index 3187aa6b7..361aff738 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/BaseSortConf.java
@@ -46,7 +46,7 @@ public abstract class BaseSortConf {
                     return sortType;
                 }
             }
-            throw new IllegalArgumentException(String.format("Unsupported type=%s for Inlong", type));
+            throw new IllegalArgumentException(String.format("Unsupported sortType=%s", type));
         }
 
         public String getType() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
index 539291947..1a66aa532 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaOffset.java
@@ -43,7 +43,7 @@ public enum KafkaOffset {
                 return dataFormat;
             }
         }
-        throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s for Inlong", name));
+        throw new IllegalArgumentException(String.format("Unsupported KafkaOffset=%s", name));
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
similarity index 86%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
index 791edb987..bfbf38ca5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelation.java
@@ -24,20 +24,20 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import java.util.Set;
 
 /**
- * Stream node relationship info, including input node name list, output node name list.
+ * Stream node relation info, including input node name list, output node name list.
  */
 @Data
-public class StreamNodeRelationship {
+public class StreamNodeRelation {
 
     private Set<String> inputNodes;
 
     private Set<String> outputNodes;
 
-    public StreamNodeRelationship() {
+    public StreamNodeRelation() {
         this(Sets.newHashSet(), Sets.newHashSet());
     }
 
-    public StreamNodeRelationship(Set<String> inputNodes, Set<String> outputNodes) {
+    public StreamNodeRelation(Set<String> inputNodes, Set<String> outputNodes) {
         this.inputNodes = inputNodes;
         this.outputNodes = outputNodes;
     }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
index 6efd61ec5..24d1a1cee 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
@@ -33,24 +33,24 @@ import java.util.Queue;
 import java.util.Set;
 
 /**
- * Stream pipeline, save stream node relationship list.
+ * Stream pipeline, save stream node relation list.
  */
 @Data
 public class StreamPipeline {
 
-    private List<StreamNodeRelationship> pipeline;
+    private List<StreamNodeRelation> pipeline;
 
     public StreamPipeline() {
         this(Lists.newArrayList());
     }
 
-    public StreamPipeline(List<StreamNodeRelationship> pipeline) {
+    public StreamPipeline(List<StreamNodeRelation> pipeline) {
         Preconditions.checkNotNull(pipeline, "Pipeline should not be null");
         this.pipeline = pipeline;
     }
 
-    public void addRelationship(StreamNodeRelationship relationship) {
-        pipeline.add(relationship);
+    public void addRelation(StreamNodeRelation relation) {
+        pipeline.add(relation);
     }
 
     /**
@@ -58,9 +58,9 @@ public class StreamPipeline {
      */
     public Pair<Boolean, Pair<String, String>> hasCircle() {
         Map<String, Set<String>> priorityMap = Maps.newHashMap();
-        for (StreamNodeRelationship relationship : pipeline) {
-            Set<String> inputNodes = relationship.getInputNodes();
-            Set<String> outputNodes = relationship.getOutputNodes();
+        for (StreamNodeRelation relation : pipeline) {
+            Set<String> inputNodes = relation.getInputNodes();
+            Set<String> outputNodes = relation.getOutputNodes();
             for (String inputNode : inputNodes) {
                 for (String outputNode : outputNodes) {
                     priorityMap.computeIfAbsent(inputNode, key -> Sets.newHashSet()).add(outputNode);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
index d01e2ee08..248347d15 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
@@ -29,12 +29,23 @@ import java.util.List;
 
 /**
  * A class to define operation to filter stream records by different modes.
- * Rule mode is more recommended then script mode
+ * Rule mode is more recommended than script mode.
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
 public class FilterDefinition extends TransformDefinition {
 
+    /**
+     * Strategy for Filter transform
+     */
+    private FilterStrategy filterStrategy;
+    /**
+     * Mode for Filter transform
+     */
+    private FilterMode filterMode;
+    private List<FilterRule> filterRules;
+    private ScriptBase scriptBase;
+
     public FilterDefinition(FilterStrategy filterStrategy, List<FilterRule> filterRules) {
         this.transformType = TransformType.FILTER;
         this.filterStrategy = filterStrategy;
@@ -59,16 +70,6 @@ public class FilterDefinition extends TransformDefinition {
         RULE, SCRIPT
     }
 
-    /**
-     * Strategy for Filter transform
-     */
-    private FilterStrategy filterStrategy;
-
-    /**
-     * Mode for Filter transform
-     */
-    private FilterMode filterMode;
-
     @Data
     @AllArgsConstructor
     public static class TargetValue {
@@ -84,8 +85,8 @@ public class FilterDefinition extends TransformDefinition {
     }
 
     /**
-     * Filter rule is about relationship between sourceField and targetValue;
-     * such as 'a >= b' or 'a is not null'
+     * Filter rule is about relation between sourceField and targetValue.
+     * Such as 'a >= b' or 'a were not null'
      */
     @Data
     @AllArgsConstructor
@@ -100,8 +101,6 @@ public class FilterDefinition extends TransformDefinition {
         private RuleRelation relationWithPost;
     }
 
-    private List<FilterRule> filterRules;
-
     @Data
     @AllArgsConstructor
     public static class ScriptBase {
@@ -110,6 +109,4 @@ public class FilterDefinition extends TransformDefinition {
 
         private String script;
     }
-
-    private ScriptBase scriptBase;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
index bcaa33503..62f4fce3f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
@@ -31,10 +31,15 @@ import java.util.List;
  * A class to define operation to split fields according to SplitRule defined.
  */
 @Data
-@EqualsAndHashCode(callSuper = true)
 @Builder
+@EqualsAndHashCode(callSuper = true)
 public class SplitterDefinition extends TransformDefinition {
 
+    /**
+     * Split rules for transform;
+     */
+    private List<SplitRule> splitRules;
+
     public SplitterDefinition(List<SplitRule> splitRules) {
         this.transformType = TransformType.SPLITTER;
         this.splitRules = splitRules;
@@ -42,31 +47,26 @@ public class SplitterDefinition extends TransformDefinition {
 
     /**
      * SplitterRule is aim to define a splitter action below:
-     * SourceField will be splitted to targetFields by seperator
+     * SourceField will be split to targetFields by separator
      */
     @Data
     @AllArgsConstructor
     public static class SplitRule {
 
         /**
-         * Field to split;
+         * Field to split
          */
         private StreamField sourceField;
 
         /**
-         * String seperator to split sourceField;
+         * String separator to split sourceField
          */
-        private String seperator;
+        private String separator;
 
         /**
-         * Fields generated when sourceField is splitted
+         * Fields generated when sourceField is split.
          * Use sourceName_0, sourceName_1, sourceName_2 if not set
          */
         private List<String> targetFields;
     }
-
-    /**
-     * Split rules for transform;
-     */
-    private List<SplitRule> splitRules;
 }
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
index af29276b5..3e8d20b4e 100644
--- a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
@@ -30,11 +30,11 @@ public class StreamPipelineTest {
     @Test
     public void testCheckHasCircle() {
         StreamPipeline streamPipeline = new StreamPipeline();
-        streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
-        streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("C"), Sets.newHashSet("D")));
-        streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
-        streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("F"), Sets.newHashSet("G")));
-        streamPipeline.addRelationship(new StreamNodeRelationship(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
+        streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
+        streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("C"), Sets.newHashSet("D")));
+        streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
+        streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("F"), Sets.newHashSet("G")));
+        streamPipeline.addRelation(new StreamNodeRelation(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
         Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
         Assert.assertTrue(circleState.getLeft());
         Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getLeft()));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 0030ead9b..b6a7cf385 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -405,8 +405,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         // Tube’s topic is the inlong group level, one inlong group, one Tube topic
         MQType mqType = MQType.forType(topicVO.getMqType());
         if (mqType == MQType.TUBE) {
-            String bizTopic = topicVO.getMqResource();
-            Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
+            String mqResource = topicVO.getMqResource();
+            Preconditions.checkTrue(mqResource == null || mqResource.equals(info.getTopic()),
                     "topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
         } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             // Pulsar's topic is the inlong stream level.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 3ca608413..03da93075 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -104,7 +104,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be added
-        checkBizIsTempStatus(groupId);
+        checkGroupStatusIsTemp(groupId);
 
         // The streamId under the same groupId cannot be repeated
         Integer count = streamMapper.selectExistByIdentifier(groupId, streamId);
@@ -170,7 +170,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
                 .collect(Collectors.groupingBy(InlongStreamExtInfo::getInlongStreamId,
                         HashMap::new,
                         Collectors.toCollection(ArrayList::new)));
-        streamList.stream().forEach(streamInfo -> {
+        streamList.forEach(streamInfo -> {
             String streamId = streamInfo.getInlongStreamId();
             List<StreamField> fieldInfos = streamFieldMap.get(streamId);
             streamInfo.setFieldList(fieldInfos);
@@ -246,7 +246,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be modified
-        InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
+        InlongGroupEntity inlongGroupEntity = this.checkGroupStatusIsTemp(groupId);
 
         // Make sure the stream was exists
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
@@ -280,7 +280,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        this.checkBizIsTempStatus(groupId);
+        this.checkGroupStatusIsTemp(groupId);
 
         InlongStreamEntity entity = streamMapper.selectByIdentifier(groupId, streamId);
         if (entity == null) {
@@ -322,7 +322,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        this.checkBizIsTempStatus(groupId);
+        this.checkGroupStatusIsTemp(groupId);
 
         List<InlongStreamEntity> entityList = streamMapper.selectByGroupId(groupId);
         if (CollectionUtils.isEmpty(entityList)) {
@@ -377,9 +377,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         InlongStreamRequest streamRequest = fullStreamRequest.getStreamInfo();
         Preconditions.checkNotNull(streamRequest, "inlong stream info is empty");
 
-        // Check whether it can be added: check by lower-level specific services
-        // this.checkBizIsTempStatus(streamInfo.getInlongGroupId());
-
         // Save inlong stream
         save(streamRequest, operator);
 
@@ -413,7 +410,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         InlongStreamRequest firstStream = fullStreamRequestList.get(0).getStreamInfo();
         Preconditions.checkNotNull(firstStream, "inlong stream info is empty");
         String groupId = firstStream.getInlongGroupId();
-        this.checkBizIsTempStatus(groupId);
+        this.checkGroupStatusIsTemp(groupId);
 
         // This bulk save is only used when creating or editing inlong group after approval is rejected.
         // To ensure data consistency, you need to physically delete all associated data and then add
@@ -627,7 +624,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         }
 
         List<InlongStreamExtEntity> entityList = CommonBeanUtils.copyListProperties(exts, InlongStreamExtEntity::new);
-        entityList.stream().forEach(streamEntity -> {
+        entityList.forEach(streamEntity -> {
             streamEntity.setInlongGroupId(groupId);
             streamEntity.setInlongStreamId(streamId);
         });
@@ -641,7 +638,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
      * @param groupId inlong group id
      * @return inlong group entity
      */
-    private InlongGroupEntity checkBizIsTempStatus(String groupId) {
+    private InlongGroupEntity checkGroupStatusIsTemp(String groupId) {
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         Preconditions.checkNotNull(entity, "groupId is invalid");
         // Add/modify/delete is not allowed under certain inlong group status
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index bdf3168f9..c99c94b3b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -226,8 +226,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     public Boolean delete(Integer id, String operator) {
         LOGGER.info("begin to delete sink by id={}", id);
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
-        // Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
-
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
         commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index ce26c9140..262a0e7e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -47,6 +47,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -127,7 +128,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
             List<InlongStreamInfo> streamInfoList) {
         MQType mqType = MQType.forType(groupInfo.getMqType());
         if (mqType != MQType.PULSAR) {
-            String errMsg = String.format("Unsupported MqType={%s} for Inlong", mqType);
+            String errMsg = String.format("Unsupported mqType={%s}", mqType);
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
@@ -152,7 +153,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
                     pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
                 }
             }
-            pulsarSource.setScanStartupMode("earliest");
+            pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
                     .add(pulsarSource);
@@ -167,13 +168,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         return nodes;
     }
 
-    private List<NodeRelation> createNodeRelationsForStream(
-            List<StreamSource> sources, List<StreamSink> streamSinks) {
+    private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
         NodeRelation relation = new NodeRelation();
-        List<String> inputs = sources.stream().map(StreamSource::getSourceName)
-                .collect(Collectors.toList());
-        List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName)
-                .collect(Collectors.toList());
+        List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
+        List<String> outputs = streamSinks.stream().map(StreamSink::getSinkName).collect(Collectors.toList());
         relation.setInputs(inputs);
         relation.setOutputs(outputs);
         return Lists.newArrayList(relation);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 27e3b141a..02e848de2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -47,6 +47,7 @@ import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -120,7 +121,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
     private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
         MQType mqType = MQType.forType(groupInfo.getMqType());
         if (mqType != MQType.PULSAR) {
-            String errMsg = String.format("Unsupported MqType={%s} for Inlong", mqType);
+            String errMsg = String.format("Unsupported mqType={%s}", mqType);
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
@@ -143,7 +144,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
                 pulsarSource.setPrimaryKey(((KafkaSource) source).getPrimaryKey());
             }
         }
-        pulsarSource.setScanStartupMode("earliest");
+        pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
         pulsarSource.setFieldList(streamInfo.getFieldList());
         return Lists.newArrayList(pulsarSource);
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
index e018d69b8..6308b70cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
 import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.service.sort.util.NodeRelationShipUtils;
+import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
 import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.transform.StreamTransformService;
@@ -122,12 +122,12 @@ public class LightGroupSortListener implements SortOperateListener {
             List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
                     transformMap.get(streamId), sinkMap.get(streamId));
             StreamInfo streamInfo = new StreamInfo(streamId, nodes,
-                    NodeRelationShipUtils.createNodeRelationShipsForStream(stream));
+                    NodeRelationUtils.createNodeRelationsForStream(stream));
             streamInfos.add(streamInfo);
 
-            // Rebuild joinerNode relationship
+            // Rebuild joinerNode relation
             List<TransformResponse> transformResponseList = transformMap.get(streamId);
-            NodeRelationShipUtils.optimizeNodeRelationShips(streamInfo, transformResponseList);
+            NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
         }
 
         return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 83cccc84e..9447b156a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -180,7 +180,7 @@ public class ExtractNodeUtils {
                 throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
         }
         KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
-        KafkaScanStartupMode startupMode = null;
+        KafkaScanStartupMode startupMode;
         switch (kafkaOffset) {
             case EARLIEST:
                 startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 301914e73..5793db886 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -77,7 +77,7 @@ public class FieldRelationUtils {
                 return createJoinerFieldRelations(fieldList, transformName);
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Unsupported transformType=%s for Inlong", transformType));
+                        String.format("Unsupported transformType=%s", transformType));
         }
     }
 
@@ -99,8 +99,7 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields in join function.
      */
-    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
-                                                                  String transformName) {
+    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName) {
         return fieldList.stream()
                 .map(streamField -> {
                     FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
@@ -139,8 +138,7 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields in replace function.
      */
-    private static List<FieldRelation> createReplacerFieldRelations(
-            List<StreamField> fieldList, String transformName,
+    private static List<FieldRelation> createReplacerFieldRelations(List<StreamField> fieldList, String transformName,
             StringReplacerDefinition replacerDefinition, String preNodes) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
         String preNode = preNodes.split(",")[0];
@@ -183,7 +181,7 @@ public class FieldRelationUtils {
      * Parse rule of replacer.
      */
     private static FieldRelation parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
-                                                  String transformName, String preNode) {
+            String transformName, String preNode) {
         StreamField sourceField = replaceRule.getSourceField();
         final String fieldName = sourceField.getFieldName();
         String regex = replaceRule.getRegex();
@@ -209,16 +207,16 @@ public class FieldRelationUtils {
      * Parse rule of split.
      */
     private static List<FieldRelation> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
-                                                      String transformName, String preNode) {
+            String transformName, String preNode) {
         StreamField sourceField = splitRule.getSourceField();
         FieldInfo fieldInfo = FieldInfoUtils.parseStreamField(sourceField);
         fieldInfo.setNodeId(preNode);
-        String seperator = splitRule.getSeperator();
+        String separator = splitRule.getSeparator();
         List<String> targetSources = splitRule.getTargetFields();
         List<FieldRelation> splitRelations = Lists.newArrayList();
         for (int index = 0; index < targetSources.size(); index++) {
             SplitIndexFunction splitIndexFunction = new SplitIndexFunction(
-                    fieldInfo, new StringConstantParam(seperator), new ConstantParam(index));
+                    fieldInfo, new StringConstantParam(separator), new ConstantParam(index));
             FieldInfo targetFieldInfo = new FieldInfo(
                     targetSources.get(index), transformName, FieldInfoUtils.convertFieldFormat(FieldType.STRING.name())
             );
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index 2bc91662e..53c5dbfee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -77,7 +77,7 @@ public class FilterFunctionUtils {
                 return Lists.newArrayList();
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Unsupported transformType=%s for Inlong", transformType));
+                        String.format("Unsupported transformType=%s", transformType));
         }
     }
 
@@ -129,7 +129,7 @@ public class FilterFunctionUtils {
                 return null;
             default:
                 throw new UnsupportedOperationException(
-                        String.format("Unsupported transformType=%s for Inlong", transformType));
+                        String.format("Unsupported transformType=%s", transformType));
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index dd8191703..ffa9054cd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -97,7 +97,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fieldInfos = fieldList.stream()
                 .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         Map<String, String> properties = kafkaSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         Integer sinkParallelism = null;
@@ -129,7 +129,7 @@ public class LoadNodeUtils {
         return new KafkaLoadNode(id,
                 name,
                 fieldInfos,
-                fieldRelationships,
+                fieldRelations,
                 Lists.newArrayList(),
                 null,
                 topicName,
@@ -154,7 +154,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         Map<String, String> properties = hiveSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         List<FieldInfo> partitionFields = Lists.newArrayList();
@@ -168,7 +168,7 @@ public class LoadNodeUtils {
                 id,
                 name,
                 fields,
-                fieldRelationships,
+                fieldRelations,
                 Lists.newArrayList(),
                 null,
                 null,
@@ -193,14 +193,14 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         Map<String, String> properties = hbaseSink.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         return new HbaseLoadNode(
                 id,
                 name,
                 fields,
-                fieldRelationships,
+                fieldRelations,
                 Lists.newArrayList(),
                 null,
                 null,
@@ -225,10 +225,10 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
         return new PostgresLoadNode(postgresSink.getSinkName(),
                 postgresSink.getSinkName(),
-                fields, fieldRelationships, null, null, 1,
+                fields, fieldRelations, null, null, 1,
                 null, postgresSink.getJdbcUrl(), postgresSink.getUsername(),
                 postgresSink.getPassword(),
                 postgresSink.getDbName() + "." + postgresSink.getTableName(),
@@ -244,9 +244,9 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = sinkFields.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
+        List<FieldRelation> fieldRelations = parseSinkFields(sinkFields, name);
         return new ClickHouseLoadNode(name, name,
-                fields, fieldRelationShips, null, null, 1,
+                fields, fieldRelations, null, null, 1,
                 null, ckSink.getTableName(),
                 ckSink.getJdbcUrl(),
                 ckSink.getUsername(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
index b9791ea64..d5bafbcd5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationUtils.java
@@ -56,33 +56,33 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Util for creat node relationship.
+ * Util for create node relation.
  */
 @Slf4j
-public class NodeRelationShipUtils {
+public class NodeRelationUtils {
 
     /**
-     * Create node relationship for the given stream
+     * Create node relation for the given stream
      */
-    public static List<NodeRelation> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
+    public static List<NodeRelation> createNodeRelationsForStream(InlongStreamInfo streamInfo) {
         String tempView = streamInfo.getExtParams();
         if (StringUtils.isEmpty(tempView)) {
-            log.warn("StreamNodeRelationShip is empty for Stream={}", streamInfo);
+            log.warn("stream node relation is empty for {}", streamInfo);
             return Lists.newArrayList();
         }
         StreamPipeline pipeline = StreamParseUtils.parseStreamPipeline(streamInfo.getExtParams(),
                 streamInfo.getInlongStreamId());
         return pipeline.getPipeline().stream()
-                .map(nodeRelationship -> new NodeRelation(
-                        Lists.newArrayList(nodeRelationship.getInputNodes()),
-                        Lists.newArrayList(nodeRelationship.getOutputNodes())))
+                .map(nodeRelation -> new NodeRelation(
+                        Lists.newArrayList(nodeRelation.getInputNodes()),
+                        Lists.newArrayList(nodeRelation.getOutputNodes())))
                 .collect(Collectors.toList());
     }
 
     /**
-     * Optimize relationship of node, JoinerRelationship must be rebuilt.
+     * Optimize relation of node, JoinerRelation must be rebuilt.
      */
-    public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
+    public static void optimizeNodeRelation(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
         if (CollectionUtils.isEmpty(transformResponses)) {
             return;
         }
@@ -100,27 +100,26 @@ public class NodeRelationShipUtils {
                     return transformDefinition.getTransformType() == TransformType.JOINER;
                 }).collect(Collectors.toMap(TransformNode::getName, transformNode -> transformNode));
 
-        List<NodeRelation> relationships = streamInfo.getRelations();
-        Iterator<NodeRelation> shipIterator = relationships.listIterator();
-        List<NodeRelation> joinRelationships = Lists.newArrayList();
+        List<NodeRelation> relations = streamInfo.getRelations();
+        Iterator<NodeRelation> shipIterator = relations.listIterator();
+        List<NodeRelation> joinRelations = Lists.newArrayList();
         while (shipIterator.hasNext()) {
-            NodeRelation relationship = shipIterator.next();
-            List<String> outputs = relationship.getOutputs();
+            NodeRelation relation = shipIterator.next();
+            List<String> outputs = relation.getOutputs();
             if (outputs.size() == 1) {
                 String nodeName = outputs.get(0);
                 if (joinNodes.get(nodeName) != null) {
                     TransformDefinition transformDefinition = transformTypeMap.get(nodeName);
                     TransformNode transformNode = joinNodes.get(nodeName);
-                    joinRelationships.add(createNodeRelationShip((JoinerDefinition) transformDefinition, relationship));
+                    joinRelations.add(getNodeRelation((JoinerDefinition) transformDefinition, relation));
                     shipIterator.remove();
                 }
             }
         }
-        relationships.addAll(joinRelationships);
+        relations.addAll(joinRelations);
     }
 
-    private static NodeRelation createNodeRelationShip(JoinerDefinition joinerDefinition,
-                                                       NodeRelation nodeRelationship) {
+    private static NodeRelation getNodeRelation(JoinerDefinition joinerDefinition, NodeRelation nodeRelation) {
         JoinMode joinMode = joinerDefinition.getJoinMode();
         String leftNode = getNodeName(joinerDefinition.getLeftNode());
         String rightNode = getNodeName(joinerDefinition.getRightNode());
@@ -143,11 +142,11 @@ public class NodeRelationShipUtils {
         joinConditions.put(rightNode, filterFunctions);
         switch (joinMode) {
             case LEFT_JOIN:
-                return new LeftOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new LeftOuterJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
             case INNER_JOIN:
-                return new InnerJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new InnerJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
             case RIGHT_JOIN:
-                return new RightOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new RightOuterJoinNodeRelation(preNodes, nodeRelation.getOutputs(), joinConditions);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 4f480ac32..cbf831cab 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -63,7 +63,7 @@ public class TransformNodeUtils {
      * Create distinct node based on deDuplicationDefinition
      */
     public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
-                                                  TransformResponse transformResponse) {
+            TransformResponse transformResponse) {
         List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
         List<FieldInfo> distinctFields = streamFields.stream()
                 .map(FieldInfoUtils::parseStreamField)
@@ -71,7 +71,7 @@ public class TransformNodeUtils {
         StreamField timingField = deDuplicationDefinition.getTimingField();
         FieldInfo orderField = FieldInfoUtils.parseStreamField(timingField);
         DeDuplicationStrategy deDuplicationStrategy = deDuplicationDefinition.getDeDuplicationStrategy();
-        OrderDirection orderDirection = null;
+        OrderDirection orderDirection;
         switch (deDuplicationStrategy) {
             case RESERVE_LAST:
                 orderDirection = OrderDirection.DESC;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
index 0c23c4314..e237344d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
@@ -105,7 +105,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.addListener(consumptionPassTaskListener);
         process.addTask(adminUserTask);
 
-        // Set order relationship
+        // Set order relation
         startEvent.addNext(groupOwnerUserTask);
         groupOwnerUserTask.addNext(adminUserTask);
         adminUserTask.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
index 245d1a5e4..eee0adc93 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
@@ -87,7 +87,7 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.addListener(groupAfterApprovedListener);
         process.addTask(adminUserTask);
 
-        // Configuration order relationship
+        // Configuration order relation
         startEvent.addNext(adminUserTask);
         // If you need another approval process, you can add it here
         adminUserTask.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
index 974520e32..0e35ff280 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
@@ -49,10 +49,10 @@ public class GroupUpdateCompleteListener implements ProcessEventListener {
     public ListenerResult listen(WorkflowContext context) throws Exception {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         String operator = context.getOperator();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
+        GroupOperateType operateType = form.getGroupOperateType();
         InlongGroupInfo groupInfo = form.getGroupInfo();
         Integer nextStatus;
-        switch (groupOperateType) {
+        switch (operateType) {
             case RESTART:
                 nextStatus = GroupStatus.RESTARTED.getCode();
                 break;
@@ -63,8 +63,7 @@ public class GroupUpdateCompleteListener implements ProcessEventListener {
                 nextStatus = GroupStatus.DELETED.getCode();
                 break;
             default:
-                throw new RuntimeException(
-                        String.format("Unsupported operation=%s for Inlong group", groupOperateType));
+                throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
         }
         // Update inlong group status and other info
         groupService.updateStatus(groupInfo.getInlongGroupId(), nextStatus, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
index 1b8689a1e..33312397b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
@@ -50,11 +50,11 @@ public class StreamUpdateCompleteListener implements ProcessEventListener {
         StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm();
         final InlongStreamInfo streamInfo = form.getStreamInfo();
         final String operator = context.getOperator();
-        final GroupOperateType groupOperateType = form.getGroupOperateType();
+        final GroupOperateType operateType = form.getGroupOperateType();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         StreamStatus status;
-        switch (groupOperateType) {
+        switch (operateType) {
             case RESTART:
                 status = StreamStatus.RESTARTED;
                 break;
@@ -65,8 +65,7 @@ public class StreamUpdateCompleteListener implements ProcessEventListener {
                 status = StreamStatus.DELETED;
                 break;
             default:
-                throw new RuntimeException(
-                        String.format("Unsupported operation=%s for Inlong group", groupOperateType));
+                throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
         }
         streamService.updateStatus(groupId, streamId, status.getCode(), operator);
         streamService.update(streamInfo.genRequest(), operator);