You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/30 08:55:09 UTC

[incubator-inlong] branch master updated: [INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d68ac524 [INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)
0d68ac524 is described below

commit 0d68ac524578285d044abec0f9c7dd863b20288f
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon May 30 16:55:03 2022 +0800

    [INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)
---
 .../service/sort/CreateSortConfigListenerV2.java   |  14 +-
 .../sort/CreateStreamSortConfigListener.java       |  16 +-
 ...ationShipUtils.java => FieldRelationUtils.java} |  96 +++---
 .../manager/service/sort/util/LoadNodeUtils.java   |  16 +-
 .../service/sort/util/NodeRelationShipUtils.java   |  30 +-
 .../service/sort/util/TransformNodeUtils.java      |   6 +-
 .../inlong/sort/protocol/BuiltInFieldInfo.java     |   1 +
 .../org/apache/inlong/sort/protocol/FieldInfo.java |  36 +--
 .../apache/inlong/sort/protocol/MetaFieldInfo.java | 127 ++++++++
 .../apache/inlong/sort/protocol/StreamInfo.java    |   8 +-
 .../inlong/sort/protocol/node/ExtractNode.java     |   8 +-
 .../apache/inlong/sort/protocol/node/LoadNode.java |  30 +-
 .../protocol/node/load/ClickHouseLoadNode.java     |   6 +-
 .../protocol/node/load/FileSystemLoadNode.java     |   6 +-
 .../sort/protocol/node/load/HbaseLoadNode.java     |   6 +-
 .../sort/protocol/node/load/HiveLoadNode.java      |   6 +-
 .../sort/protocol/node/load/IcebergLoadNode.java   |   6 +-
 .../sort/protocol/node/load/KafkaLoadNode.java     |   6 +-
 .../sort/protocol/node/load/MySqlLoadNode.java     |   6 +-
 .../sort/protocol/node/load/OracleLoadNode.java    |   6 +-
 .../sort/protocol/node/load/PostgresLoadNode.java  |   6 +-
 .../sort/protocol/node/load/SqlServerLoadNode.java |   6 +-
 .../protocol/node/load/TDSQLPostgresLoadNode.java  |   6 +-
 .../sort/protocol/node/transform/DistinctNode.java |   8 +-
 .../protocol/node/transform/TransformNode.java     |  14 +-
 .../{FieldRelationShip.java => FieldRelation.java} |  11 +-
 ...elationShip.java => FullOuterJoinRelation.java} |  12 +-
 ...elationShip.java => InnerJoinNodeRelation.java} |  12 +-
 .../{JoinRelationShip.java => JoinRelation.java}   |  22 +-
 ...ionShip.java => LeftOuterJoinNodeRelation.java} |   9 +-
 .../{NodeRelationShip.java => NodeRelation.java}   |  22 +-
 ...onShip.java => RightOuterJoinNodeRelation.java} |  17 +-
 ...odeRelationShip.java => UnionNodeRelation.java} |  10 +-
 .../apache/inlong/sort/protocol/FieldInfoTest.java |   2 +-
 .../apache/inlong/sort/protocol/GroupInfoTest.java |  18 +-
 ...odeRelationTest.java => MetaFieldInfoTest.java} |  12 +-
 .../inlong/sort/protocol/StreamInfoTest.java       |  28 +-
 .../protocol/node/load/ClickHouseLoadNodeTest.java |   4 +-
 .../sort/protocol/node/load/HbaseLoadNodeTest.java |   4 +-
 .../sort/protocol/node/load/HiveLoadNodeTest.java  |   4 +-
 .../sort/protocol/node/load/KafkaLoadNodeTest.java |   4 +-
 .../sort/protocol/node/load/MySqlLoadNodeTest.java |   4 +-
 .../protocol/node/load/OracleLoadNodeTest.java     |   4 +-
 .../protocol/node/load/PostgresLoadNodeTest.java   |   4 +-
 .../protocol/node/load/SqlServerLoadNodeTest.java  |   8 +-
 .../node/load/TDSQLPostgresLoadNodeTest.java       |   4 +-
 .../protocol/node/transform/DistinctNodeTest.java  |  10 +-
 ...elationShipTest.java => FieldRelationTest.java} |   8 +-
 .../relation/FullOuterJoinNodeRelationTest.java    |   8 +-
 .../relation/InnerJoinNodeRelationTest.java        |   8 +-
 .../relation/LeftOuterJoinNodeRelationTest.java    |   8 +-
 .../transformation/relation/NodeRelationTest.java  |   8 +-
 .../relation/RightOuterJoinNodeRelationTest.java   |   8 +-
 .../relation/UnionNodeRelationTest.java            |   8 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 324 ++++++++++++++++-----
 .../apache/inlong/sort/parser/AllMigrateTest.java  |  12 +-
 .../sort/parser/ClickHouseSqlParserTest.java       |  14 +-
 .../sort/parser/DistinctNodeSqlParseTest.java      |  62 ++--
 .../apache/inlong/sort/parser/FilterParseTest.java |  18 +-
 .../inlong/sort/parser/FlinkSqlParserTest.java     |  38 +--
 .../sort/parser/FullOuterJoinSqlParseTest.java     |  54 ++--
 .../sort/parser/HbaseLoadFlinkSqlParseTest.java    |  12 +-
 .../sort/parser/IcebergNodeSqlParserTest.java      |  28 +-
 ...est.java => InnerJoinRelationSqlParseTest.java} |  68 ++---
 .../sort/parser/LeftOuterJoinSqlParseTest.java     |  54 ++--
 .../inlong/sort/parser/MetaFieldSyncTest.java      |  72 ++---
 .../sort/parser/MongoExtractFlinkSqlParseTest.java |  14 +-
 .../inlong/sort/parser/MySqlLoadSqlParseTest.java  |  16 +-
 .../sort/parser/OracleExtractSqlParseTest.java     |  16 +-
 .../inlong/sort/parser/OracleLoadSqlParseTest.java |  16 +-
 .../parser/PostgresExtractFlinkSqlParseTest.java   |  12 +-
 .../parser/PostgresLoadNodeFlinkSqlParseTest.java  |  12 +-
 .../inlong/sort/parser/PulsarSqlParserTest.java    |  14 +-
 .../sort/parser/RightOuterJoinSqlParseTest.java    |  54 ++--
 .../sort/parser/SqlServerNodeSqlParseTest.java     |  20 +-
 .../TDSQLPostgresLoadNodeFlinkSqlParseTest.java    |  12 +-
 76 files changed, 1003 insertions(+), 696 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 1a00a22f8..1a9032088 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -115,7 +115,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
                         createNodesForStream(
                                 sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
                                 sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
-                        createNodeRelationshipsForStream(
+                        createNodeRelationsForStream(
                                 sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
                                 sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())))
                 ).collect(Collectors.toList());
@@ -168,17 +168,17 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         return nodes;
     }
 
-    private List<NodeRelationShip> createNodeRelationshipsForStream(
+    private List<NodeRelation> createNodeRelationsForStream(
             List<SourceResponse> sourceResponses,
             List<SinkResponse> sinkResponses) {
-        NodeRelationShip relationship = new NodeRelationShip();
+        NodeRelation relation = new NodeRelation();
         List<String> inputs = sourceResponses.stream().map(SourceResponse::getSourceName)
                 .collect(Collectors.toList());
         List<String> outputs = sinkResponses.stream().map(SinkResponse::getSinkName)
                 .collect(Collectors.toList());
-        relationship.setInputs(inputs);
-        relationship.setOutputs(outputs);
-        return Lists.newArrayList(relationship);
+        relation.setInputs(inputs);
+        relation.setOutputs(outputs);
+        return Lists.newArrayList(relation);
     }
 
     private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index bf03da23a..55f2b0ad0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -95,8 +95,8 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
         try {
             List<SourceResponse> sourceResponses = createPulsarSources(groupInfo, streamInfo);
             List<Node> nodes = createNodesForStream(sourceResponses, sinkResponses);
-            List<NodeRelationShip> nodeRelationships = createNodeRelationshipsForStream(sourceResponses, sinkResponses);
-            StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelationships);
+            List<NodeRelation> nodeRelations = createNodeRelationsForStream(sourceResponses, sinkResponses);
+            StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelations);
             GroupInfo sortGroupInfo = new GroupInfo(groupId, Lists.newArrayList(sortStreamInfo));
             String dataFlows = OBJECT_MAPPER.writeValueAsString(sortGroupInfo);
             InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
@@ -157,17 +157,17 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
         return nodes;
     }
 
-    private List<NodeRelationShip> createNodeRelationshipsForStream(
+    private List<NodeRelation> createNodeRelationsForStream(
             List<SourceResponse> sourceResponses,
             List<SinkResponse> sinkResponses) {
-        NodeRelationShip relationship = new NodeRelationShip();
+        NodeRelation relation = new NodeRelation();
         List<String> inputs = sourceResponses.stream().map(SourceResponse::getSourceName)
                 .collect(Collectors.toList());
         List<String> outputs = sinkResponses.stream().map(SinkResponse::getSinkName)
                 .collect(Collectors.toList());
-        relationship.setInputs(inputs);
-        relationship.setOutputs(outputs);
-        return Lists.newArrayList(relationship);
+        relation.setInputs(inputs);
+        relation.setOutputs(outputs);
+        return Lists.newArrayList(relation);
     }
 
     private void upsertDataFlow(InlongStreamInfo streamInfo, InlongStreamExtInfo extInfo, String keyName) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
similarity index 72%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 46216cc62..301914e73 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -36,7 +36,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
 import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -49,14 +49,14 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * Util for creat field relationship.
+ * Util for creat field relation.
  */
-public class FieldRelationShipUtils {
+public class FieldRelationUtils {
 
     /**
-     * Create relationship of fields.
+     * Create relation of fields.
      */
-    public static List<FieldRelationShip> createFieldRelationShips(TransformResponse transformResponse) {
+    public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse) {
         TransformType transformType = TransformType.forType(transformResponse.getTransformType());
         TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
                 transformResponse.getTransformDefinition(), transformType);
@@ -66,15 +66,15 @@ public class FieldRelationShipUtils {
         switch (transformType) {
             case SPLITTER:
                 SplitterDefinition splitterDefinition = (SplitterDefinition) transformDefinition;
-                return createSplitterFieldRelationShips(fieldList, transformName, splitterDefinition, preNodes);
+                return createSplitterFieldRelations(fieldList, transformName, splitterDefinition, preNodes);
             case STRING_REPLACER:
                 StringReplacerDefinition replacerDefinition = (StringReplacerDefinition) transformDefinition;
-                return createReplacerFieldRelationShips(fieldList, transformName, replacerDefinition, preNodes);
+                return createReplacerFieldRelations(fieldList, transformName, replacerDefinition, preNodes);
             case DE_DUPLICATION:
             case FILTER:
-                return createFieldRelationShips(fieldList, transformName);
+                return createFieldRelations(fieldList, transformName);
             case JOINER:
-                return createJoinerFieldRelationShips(fieldList, transformName);
+                return createJoinerFieldRelations(fieldList, transformName);
             default:
                 throw new UnsupportedOperationException(
                         String.format("Unsupported transformType=%s for Inlong", transformType));
@@ -82,9 +82,9 @@ public class FieldRelationShipUtils {
     }
 
     /**
-     * Create relationship of fields.
+     * Create relation of fields.
      */
-    private static List<FieldRelationShip> createFieldRelationShips(List<StreamField> fieldList, String transformName) {
+    private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName) {
         return fieldList.stream()
                 .map(FieldInfoUtils::parseStreamField)
                 .map(fieldInfo -> {
@@ -92,15 +92,15 @@ public class FieldRelationShipUtils {
                             fieldInfo.getFormatInfo());
                     FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
                             fieldInfo.getFormatInfo());
-                    return new FieldRelationShip(inputField, outputField);
+                    return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
 
     /**
-     * Create relationship of fields in join function.
+     * Create relation of fields in join function.
      */
-    private static List<FieldRelationShip> createJoinerFieldRelationShips(List<StreamField> fieldList,
-            String transformName) {
+    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
+                                                                  String transformName) {
         return fieldList.stream()
                 .map(streamField -> {
                     FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
@@ -109,20 +109,21 @@ public class FieldRelationShipUtils {
                             streamField.getOriginNodeName(), formatInfo);
                     FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
                             transformName, formatInfo);
-                    return new FieldRelationShip(inputField, outputField);
+                    return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
 
     /**
-     * Create relationship of fields in split function.
+     * Create relation of fields in split function.
      */
-    private static List<FieldRelationShip> createSplitterFieldRelationShips(List<StreamField> fieldList,
-            String transformName, SplitterDefinition splitterDefinition, String preNodes) {
+    private static List<FieldRelation> createSplitterFieldRelations(
+            List<StreamField> fieldList, String transformName,
+            SplitterDefinition splitterDefinition, String preNodes) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
         String preNode = preNodes.split(",")[0];
         List<SplitRule> splitRules = splitterDefinition.getSplitRules();
         Set<String> splitFields = Sets.newHashSet();
-        List<FieldRelationShip> fieldRelationships = splitRules.stream()
+        List<FieldRelation> fieldRelations = splitRules.stream()
                 .map(splitRule -> parseSplitRule(splitRule, splitFields, transformName, preNode))
                 .reduce(Lists.newArrayList(), (list1, list2) -> {
                     list1.addAll(list2);
@@ -131,57 +132,58 @@ public class FieldRelationShipUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelationships.addAll(createFieldRelationShips(filteredFieldList, transformName));
-        return fieldRelationships;
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+        return fieldRelations;
     }
 
     /**
-     * Create relationship of fields in replace function.
+     * Create relation of fields in replace function.
      */
-    private static List<FieldRelationShip> createReplacerFieldRelationShips(List<StreamField> fieldList,
-            String transformName, StringReplacerDefinition replacerDefinition, String preNodes) {
+    private static List<FieldRelation> createReplacerFieldRelations(
+            List<StreamField> fieldList, String transformName,
+            StringReplacerDefinition replacerDefinition, String preNodes) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
         String preNode = preNodes.split(",")[0];
         List<ReplaceRule> replaceRules = replacerDefinition.getReplaceRules();
         Set<String> replaceFields = Sets.newHashSet();
-        List<FieldRelationShip> fieldRelationships = replaceRules.stream()
+        List<FieldRelation> fieldRelations = replaceRules.stream()
                 .map(replaceRule -> parseReplaceRule(replaceRule, replaceFields, transformName, preNode))
                 .collect(Collectors.toList());
-        fieldRelationships = cascadeFunctionRelationships(fieldRelationships);
+        fieldRelations = cascadeFunctionRelations(fieldRelations);
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelationships.addAll(createFieldRelationShips(filteredFieldList, transformName));
-        return fieldRelationships;
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+        return fieldRelations;
     }
 
     /**
-     * Create relationship of fields in cascade function.
+     * Create relation of fields in cascade function.
      */
-    private static List<FieldRelationShip> cascadeFunctionRelationships(List<FieldRelationShip> fieldRelationships) {
+    private static List<FieldRelation> cascadeFunctionRelations(List<FieldRelation> fieldRelations) {
         Map<String, List<CascadeFunction>> cascadeFunctions = Maps.newHashMap();
         Map<String, FieldInfo> targetFields = Maps.newHashMap();
-        for (FieldRelationShip fieldRelationship : fieldRelationships) {
-            CascadeFunction cascadeFunction = (CascadeFunction) fieldRelationship.getInputField();
-            String targetField = fieldRelationship.getOutputField().getName();
+        for (FieldRelation fieldRelation : fieldRelations) {
+            CascadeFunction cascadeFunction = (CascadeFunction) fieldRelation.getInputField();
+            String targetField = fieldRelation.getOutputField().getName();
             cascadeFunctions.computeIfAbsent(targetField, k -> Lists.newArrayList()).add(cascadeFunction);
-            targetFields.put(targetField, fieldRelationship.getOutputField());
+            targetFields.put(targetField, fieldRelation.getOutputField());
         }
-        List<FieldRelationShip> cascadeRelationships = Lists.newArrayList();
+        List<FieldRelation> cascadeRelations = Lists.newArrayList();
         for (Map.Entry<String, List<CascadeFunction>> entry : cascadeFunctions.entrySet()) {
             String targetField = entry.getKey();
             CascadeFunctionWrapper functionWrapper = new CascadeFunctionWrapper(entry.getValue());
             FieldInfo targetFieldInfo = targetFields.get(targetField);
-            cascadeRelationships.add(new FieldRelationShip(functionWrapper, targetFieldInfo));
+            cascadeRelations.add(new FieldRelation(functionWrapper, targetFieldInfo));
         }
-        return cascadeRelationships;
+        return cascadeRelations;
     }
 
     /**
      * Parse rule of replacer.
      */
-    private static FieldRelationShip parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
-            String transformName, String preNode) {
+    private static FieldRelation parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
+                                                  String transformName, String preNode) {
         StreamField sourceField = replaceRule.getSourceField();
         final String fieldName = sourceField.getFieldName();
         String regex = replaceRule.getRegex();
@@ -195,25 +197,25 @@ public class FieldRelationShipUtils {
         if (replaceMode == ReplaceMode.RELACE_ALL) {
             RegexpReplaceFunction regexpReplaceFunction = new RegexpReplaceFunction(fieldInfo,
                     new StringConstantParam(regex), new StringConstantParam(targetValue));
-            return new FieldRelationShip(regexpReplaceFunction, targetFieldInfo);
+            return new FieldRelation(regexpReplaceFunction, targetFieldInfo);
         } else {
             RegexpReplaceFirstFunction regexpReplaceFirstFunction = new RegexpReplaceFirstFunction(fieldInfo,
                     new StringConstantParam(regex), new StringConstantParam(targetValue));
-            return new FieldRelationShip(regexpReplaceFirstFunction, targetFieldInfo);
+            return new FieldRelation(regexpReplaceFirstFunction, targetFieldInfo);
         }
     }
 
     /**
      * Parse rule of split.
      */
-    private static List<FieldRelationShip> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
-            String transformName, String preNode) {
+    private static List<FieldRelation> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
+                                                      String transformName, String preNode) {
         StreamField sourceField = splitRule.getSourceField();
         FieldInfo fieldInfo = FieldInfoUtils.parseStreamField(sourceField);
         fieldInfo.setNodeId(preNode);
         String seperator = splitRule.getSeperator();
         List<String> targetSources = splitRule.getTargetFields();
-        List<FieldRelationShip> splitRelationships = Lists.newArrayList();
+        List<FieldRelation> splitRelations = Lists.newArrayList();
         for (int index = 0; index < targetSources.size(); index++) {
             SplitIndexFunction splitIndexFunction = new SplitIndexFunction(
                     fieldInfo, new StringConstantParam(seperator), new ConstantParam(index));
@@ -221,8 +223,8 @@ public class FieldRelationShipUtils {
                     targetSources.get(index), transformName, FieldInfoUtils.convertFieldFormat(FieldType.STRING.name())
             );
             splitFields.add(targetSources.get(index));
-            splitRelationships.add(new FieldRelationShip(splitIndexFunction, targetFieldInfo));
+            splitRelations.add(new FieldRelation(splitIndexFunction, targetFieldInfo));
         }
-        return splitRelationships;
+        return splitRelations;
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index cd677e911..1b6ca5fde 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -42,7 +42,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.List;
 import java.util.Map;
@@ -97,7 +97,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fieldInfos = fieldList.stream()
                 .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
                 .collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
         Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         Integer sinkParallelism = null;
@@ -154,7 +154,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
         Map<String, String> properties = hiveSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         List<FieldInfo> partitionFields = Lists.newArrayList();
@@ -193,7 +193,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
         Map<String, String> properties = sinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         return new HbaseLoadNode(
@@ -225,7 +225,7 @@ public class LoadNodeUtils {
         List<FieldInfo> fields = fieldList.stream()
                 .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
                 .collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+        List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
         return new PostgresLoadNode(postgresSinkResponse.getSinkName(),
                 postgresSinkResponse.getSinkName(),
                 fields, fieldRelationships, null, null, 1,
@@ -245,7 +245,7 @@ public class LoadNodeUtils {
                 .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse,
                         name))
                 .collect(Collectors.toList());
-        List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
+        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
         return new ClickHouseLoadNode(clickHouseSinkResponse.getSinkName(),
                 clickHouseSinkResponse.getSinkName(),
                 fields, fieldRelationShips, null, null, 1,
@@ -258,7 +258,7 @@ public class LoadNodeUtils {
     /**
      * Parse information field of data sink.
      */
-    public static List<FieldRelationShip> parseSinkFields(List<SinkField> fieldList, String sinkName) {
+    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName) {
         if (CollectionUtils.isEmpty(fieldList)) {
             return Lists.newArrayList();
         }
@@ -274,7 +274,7 @@ public class LoadNodeUtils {
                     String sourceFieldType = field.getSourceFieldType();
                     FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
                             FieldInfoUtils.convertFieldFormat(sourceFieldType));
-                    return new FieldRelationShip(sourceField, sinkField);
+                    return new FieldRelation(sourceField, sinkField);
                 }).collect(Collectors.toList());
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index fb8078384..b9791ea64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -45,10 +45,10 @@ import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter
 import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
 
 import java.util.Iterator;
 import java.util.List;
@@ -64,7 +64,7 @@ public class NodeRelationShipUtils {
     /**
      * Create node relationship for the given stream
      */
-    public static List<NodeRelationShip> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
+    public static List<NodeRelation> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
         String tempView = streamInfo.getExtParams();
         if (StringUtils.isEmpty(tempView)) {
             log.warn("StreamNodeRelationShip is empty for Stream={}", streamInfo);
@@ -73,7 +73,7 @@ public class NodeRelationShipUtils {
         StreamPipeline pipeline = StreamParseUtils.parseStreamPipeline(streamInfo.getExtParams(),
                 streamInfo.getInlongStreamId());
         return pipeline.getPipeline().stream()
-                .map(nodeRelationship -> new NodeRelationShip(
+                .map(nodeRelationship -> new NodeRelation(
                         Lists.newArrayList(nodeRelationship.getInputNodes()),
                         Lists.newArrayList(nodeRelationship.getOutputNodes())))
                 .collect(Collectors.toList());
@@ -100,11 +100,11 @@ public class NodeRelationShipUtils {
                     return transformDefinition.getTransformType() == TransformType.JOINER;
                 }).collect(Collectors.toMap(TransformNode::getName, transformNode -> transformNode));
 
-        List<NodeRelationShip> relationships = streamInfo.getRelations();
-        Iterator<NodeRelationShip> shipIterator = relationships.listIterator();
-        List<NodeRelationShip> joinRelationships = Lists.newArrayList();
+        List<NodeRelation> relationships = streamInfo.getRelations();
+        Iterator<NodeRelation> shipIterator = relationships.listIterator();
+        List<NodeRelation> joinRelationships = Lists.newArrayList();
         while (shipIterator.hasNext()) {
-            NodeRelationShip relationship = shipIterator.next();
+            NodeRelation relationship = shipIterator.next();
             List<String> outputs = relationship.getOutputs();
             if (outputs.size() == 1) {
                 String nodeName = outputs.get(0);
@@ -119,8 +119,8 @@ public class NodeRelationShipUtils {
         relationships.addAll(joinRelationships);
     }
 
-    private static NodeRelationShip createNodeRelationShip(JoinerDefinition joinerDefinition,
-            NodeRelationShip nodeRelationship) {
+    private static NodeRelation createNodeRelationShip(JoinerDefinition joinerDefinition,
+                                                       NodeRelation nodeRelationship) {
         JoinMode joinMode = joinerDefinition.getJoinMode();
         String leftNode = getNodeName(joinerDefinition.getLeftNode());
         String rightNode = getNodeName(joinerDefinition.getRightNode());
@@ -143,11 +143,11 @@ public class NodeRelationShipUtils {
         joinConditions.put(rightNode, filterFunctions);
         switch (joinMode) {
             case LEFT_JOIN:
-                return new LeftOuterJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new LeftOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
             case INNER_JOIN:
-                return new InnerJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new InnerJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
             case RIGHT_JOIN:
-                return new RightOuterJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+                return new RightOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 12d211996..4f480ac32 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -63,7 +63,7 @@ public class TransformNodeUtils {
      * Create distinct node based on deDuplicationDefinition
      */
     public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
-            TransformResponse transformResponse) {
+                                                  TransformResponse transformResponse) {
         List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
         List<FieldInfo> distinctFields = streamFields.stream()
                 .map(FieldInfoUtils::parseStreamField)
@@ -87,7 +87,7 @@ public class TransformNodeUtils {
         return new DistinctNode(transformNode.getId(),
                 transformNode.getName(),
                 transformNode.getFields(),
-                transformNode.getFieldRelationShips(),
+                transformNode.getFieldRelations(),
                 transformNode.getFilters(),
                 transformNode.getFilterStrategy(),
                 distinctFields,
@@ -106,7 +106,7 @@ public class TransformNodeUtils {
         List<FieldInfo> fieldInfos = transformResponse.getFieldList().stream()
                 .map(FieldInfoUtils::parseStreamField).collect(Collectors.toList());
         transformNode.setFields(fieldInfos);
-        transformNode.setFieldRelationShips(FieldRelationShipUtils.createFieldRelationShips(transformResponse));
+        transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse));
         transformNode.setFilters(
                 FilterFunctionUtils.createFilterFunctions(transformResponse));
         transformNode.setFilterStrategy(FilterFunctionUtils.parseFilterStrategy(transformResponse));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 56d728a15..d1506ad54 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 /**
  * built-in field info.
  */
+@Deprecated
 public class BuiltInFieldInfo extends FieldInfo {
 
     private static final long serialVersionUID = -3436204467879205139L;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 3f2e1e0d3..453a049b0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -30,8 +30,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.Objects;
 
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
@@ -39,6 +39,7 @@ import java.util.Objects;
         property = "type")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
+        @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
         @JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
 })
 @Data
@@ -52,24 +53,31 @@ public class FieldInfo implements FunctionParam, Serializable {
     private String nodeId;
     @JsonIgnore
     private String tableNameAlias;
+    /**
+     * It will be null if the field is a meta field
+     */
+    @Nullable
     @JsonProperty("formatInfo")
     private FormatInfo formatInfo;
 
     public FieldInfo(
             @JsonProperty("name") String name,
             @JsonProperty("formatInfo") FormatInfo formatInfo) {
-        this.name = Preconditions.checkNotNull(name);
-        this.formatInfo = Preconditions.checkNotNull(formatInfo);
+        this(name, null, formatInfo);
+    }
+
+    public FieldInfo(@JsonProperty("name") String name) {
+        this(name, null, null);
     }
 
     @JsonCreator
     public FieldInfo(
             @JsonProperty("name") String name,
             @JsonProperty("nodeId") String nodeId,
-            @JsonProperty("formatInfo") FormatInfo formatInfo) {
+            @Nullable @JsonProperty("formatInfo") FormatInfo formatInfo) {
         this.name = Preconditions.checkNotNull(name);
         this.nodeId = nodeId;
-        this.formatInfo = Preconditions.checkNotNull(formatInfo);
+        this.formatInfo = formatInfo;
     }
 
     @Override
@@ -86,22 +94,4 @@ public class FieldInfo implements FunctionParam, Serializable {
         }
         return formatName;
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        FieldInfo fieldInfo = (FieldInfo) o;
-        return name.equals(fieldInfo.name) && formatInfo.equals(fieldInfo.formatInfo);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, formatInfo);
-    }
-
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
new file mode 100644
index 000000000..bd3d389d6
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import lombok.Getter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Meta field info.
+ */
+@Getter
+public class MetaFieldInfo extends FieldInfo {
+
+    private static final long serialVersionUID = -3436204467879205139L;
+
+    @JsonProperty("metaField")
+    private final MetaField metaField;
+
+    @JsonCreator
+    public MetaFieldInfo(
+            @JsonProperty("name") String name,
+            @JsonProperty("nodeId") String nodeId,
+            @JsonProperty("metaField") MetaField metaField) {
+        super(name, nodeId, null);
+        this.metaField = metaField;
+    }
+
+    public MetaFieldInfo(
+            @JsonProperty("name") String name,
+            @JsonProperty("metaField") MetaField metaField) {
+        super(name);
+        this.metaField = metaField;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        MetaFieldInfo that = (MetaFieldInfo) o;
+        return metaField == that.metaField
+                && super.equals(that);
+    }
+
+    public enum MetaField {
+
+        /**
+         * The process time of flink
+         */
+        PROCESS_TIME,
+        /**
+         * Name of the schema that contain the row, currently used for Oracle database
+         */
+        SCHEMA_NAME,
+        /**
+         * Name of the database that contain the row.
+         */
+        DATABASE_NAME,
+        /**
+         * Name of the table that contain the row.
+         */
+        TABLE_NAME,
+        /**
+         * It indicates the time that the change was made in the database.
+         * If the record is read from snapshot of the table instead of the change stream, the value is always 0
+         */
+        OP_TS,
+        /**
+         * Whether the DDL statement. Currently, it is used for MySQL database.
+         */
+        IS_DDL,
+        /**
+         * Type of database operation, such as INSERT/DELETE, etc. Currently, it is used for MySQL database.
+         */
+        OP_TYPE,
+        /**
+         * MySQL binlog data Row. Currently, it is used for MySQL database.
+         */
+        DATA,
+        /**
+         * The value of the field before update. Currently, it is used for MySQL database.
+         */
+        UPDATE_BEFORE,
+        /**
+         * Batch id of binlog. Currently, it is used for MySQL database.
+         */
+        BATCH_ID,
+        /**
+         * Mapping of sql_type table fields to java data type IDs. Currently, it is used for MySQL database.
+         */
+        SQL_TYPE,
+        /**
+         * The current time when the ROW was received and processed. Currently, it is used for MySQL database.
+         */
+        TS,
+        /**
+         * The table structure. It is only used for MySQL database
+         */
+        MYSQL_TYPE,
+        /**
+         * Primary key field name. Currently, it is used for MySQL database.
+         */
+        PK_NAMES
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
index 64bf40480..8bcdf8de6 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
@@ -22,7 +22,7 @@ import lombok.Data;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 
 import java.io.Serializable;
 import java.util.List;
@@ -41,7 +41,7 @@ public class StreamInfo implements Serializable {
     @JsonProperty("nodes")
     private List<Node> nodes;
     @JsonProperty("relations")
-    private List<NodeRelationShip> relations;
+    private List<NodeRelation> relations;
 
     /**
      * Information of stream.
@@ -49,11 +49,11 @@ public class StreamInfo implements Serializable {
      * @param streamId Uniquely identifies of GroupInfo
      * @param nodes The node list that StreamInfo contains
      * @param relations The relation list that StreamInfo contains,
-     *         it represents the relationship between nodes of StreamInfo
+     *         it represents the relation between nodes of StreamInfo
      */
     @JsonCreator
     public StreamInfo(@JsonProperty("streamId") String streamId, @JsonProperty("nodes") List<Node> nodes,
-            @JsonProperty("relations") List<NodeRelationShip> relations) {
+            @JsonProperty("relations") List<NodeRelation> relations) {
         this.streamId = Preconditions.checkNotNull(streamId, "streamId is null");
         this.nodes = Preconditions.checkNotNull(nodes, "nodes is null");
         Preconditions.checkState(!nodes.isEmpty(), "nodes is empty");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 6fa212564..b17dbf535 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -78,10 +78,10 @@ public abstract class ExtractNode implements Node {
 
     @JsonCreator
     public ExtractNode(@JsonProperty("id") String id,
-            @JsonProperty("name") String name,
-            @JsonProperty("fields") List<FieldInfo> fields,
-            @Nullable @JsonProperty("watermark_field") WatermarkField watermarkField,
-            @JsonProperty("properties") Map<String, String> properties) {
+                       @JsonProperty("name") String name,
+                       @JsonProperty("fields") List<FieldInfo> fields,
+                       @Nullable @JsonProperty("watermark_field") WatermarkField watermarkField,
+                       @Nullable @JsonProperty("properties") Map<String, String> properties) {
         this.id = Preconditions.checkNotNull(id, "id is null");
         this.name = name;
         this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 6fbfc24a8..283e5e5ac 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,11 +34,10 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nullable;
@@ -61,8 +60,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
         @JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
         @JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
-        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
-        @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
+        @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
 })
 @NoArgsConstructor
 @Data
@@ -75,8 +73,8 @@ public abstract class LoadNode implements Node {
     private String name;
     @JsonProperty("fields")
     private List<FieldInfo> fields;
-    @JsonProperty("fieldRelationShips")
-    private List<FieldRelationShip> fieldRelationShips;
+    @JsonProperty("fieldRelations")
+    private List<FieldRelation> fieldRelations;
     @Nullable
     @JsonInclude(Include.NON_NULL)
     @JsonProperty("sinkParallelism")
@@ -94,20 +92,20 @@ public abstract class LoadNode implements Node {
 
     @JsonCreator
     public LoadNode(@JsonProperty("id") String id,
-            @JsonProperty("name") String name,
-            @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
-            @JsonProperty("filters") List<FilterFunction> filters,
-            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
-            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
-            @Nullable @JsonProperty("properties") Map<String, String> properties) {
+                    @JsonProperty("name") String name,
+                    @JsonProperty("fields") List<FieldInfo> fields,
+                    @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+                    @JsonProperty("filters") List<FilterFunction> filters,
+                    @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+                    @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+                    @Nullable @JsonProperty("properties") Map<String, String> properties) {
         this.id = Preconditions.checkNotNull(id, "id is null");
         this.name = name;
         this.fields = Preconditions.checkNotNull(fields, "fields is null");
         Preconditions.checkState(!fields.isEmpty(), "fields is empty");
-        this.fieldRelationShips = Preconditions.checkNotNull(fieldRelationShips,
-                "fieldRelationShips is null");
-        Preconditions.checkState(!fieldRelationShips.isEmpty(), "fieldRelationShips is empty");
+        this.fieldRelations = Preconditions.checkNotNull(fieldRelations,
+                "fieldRelations is null");
+        Preconditions.checkState(!fieldRelations.isEmpty(), "fieldRelations is empty");
         this.filters = filters;
         this.filterStrategy = filterStrategy;
         this.sinkParallelism = sinkParallelism;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
index 8716f0bd8..92971a0db 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
@@ -28,7 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nonnull;
@@ -65,7 +65,7 @@ public class ClickHouseLoadNode extends LoadNode implements Serializable {
     public ClickHouseLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -74,7 +74,7 @@ public class ClickHouseLoadNode extends LoadNode implements Serializable {
             @Nonnull @JsonProperty("url") String url,
             @Nonnull @JsonProperty("userName") String userName,
             @Nonnull @JsonProperty("passWord") String password) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
         this.url = Preconditions.checkNotNull(url, "url is null");
         this.userName = Preconditions.checkNotNull(userName, "userName is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
index 172b80215..8839cceb3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nonnull;
@@ -70,7 +70,7 @@ public class FileSystemLoadNode extends LoadNode implements Serializable {
     public FileSystemLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @Nonnull @JsonProperty("path") String path,
             @Nonnull @JsonProperty("format") String format,
@@ -78,7 +78,7 @@ public class FileSystemLoadNode extends LoadNode implements Serializable {
             @JsonProperty("properties") Map<String, String> properties,
             @JsonProperty("parFields") List<FieldInfo> partitionFields,
             @JsonProperty("serverTimeZone") String serverTimeZone) {
-        super(id, name, fields, fieldRelationShips, filters, null, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, null, sinkParallelism, properties);
         this.format = Preconditions.checkNotNull(format, "format type is null");
         this.path = Preconditions.checkNotNull(path, "path is null");
         this.partitionFields = partitionFields;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
index 3841454d9..1a28cad2a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.HBaseConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -76,7 +76,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
     public HbaseLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -89,7 +89,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
             @JsonProperty("zookeeperZnodeParent") String zookeeperZnodeParent,
             @JsonProperty("sinkBufferFlushMaxRows") String sinkBufferFlushMaxRows,
             @JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
         this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
         this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
index 90e3be5c6..151d5370f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nonnull;
@@ -83,7 +83,7 @@ public class HiveLoadNode extends LoadNode implements Serializable {
     public HiveLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -95,7 +95,7 @@ public class HiveLoadNode extends LoadNode implements Serializable {
             @JsonProperty("hiveVersion") String hiveVersion,
             @JsonProperty("hadoopConfDir") String hadoopConfDir,
             @JsonProperty("parFields") List<FieldInfo> partitionFields) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.database = Preconditions.checkNotNull(database, "database of hive is null");
         this.tableName = Preconditions.checkNotNull(tableName, "table of hive is null");
         this.hiveConfDir = Preconditions.checkNotNull(hiveConfDir, "hive conf directory is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 2435a827a..2d649c288 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.protocol.constant.IcebergConstant;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nonnull;
@@ -71,7 +71,7 @@ public class IcebergLoadNode extends LoadNode implements Serializable {
     public IcebergLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -82,7 +82,7 @@ public class IcebergLoadNode extends LoadNode implements Serializable {
             @JsonProperty("catalogType") IcebergConstant.CatalogType catalogType,
             @JsonProperty("uri") String uri,
             @JsonProperty("warehouse") String warehouse) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
         this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
         this.primaryKey = primaryKey;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 1542f5708..c65aa5dba 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -34,7 +34,7 @@ import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import javax.annotation.Nonnull;
@@ -72,7 +72,7 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
     public KafkaLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @Nonnull @JsonProperty("topic") String topic,
@@ -81,7 +81,7 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
             @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
             @JsonProperty("properties") Map<String, String> properties,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.topic = Preconditions.checkNotNull(topic, "topic is null");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrapServers is null");
         this.format = Preconditions.checkNotNull(format, "format is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
index 9ed44d7dc..243b96dc4 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -61,7 +61,7 @@ public class MySqlLoadNode extends LoadNode implements Serializable {
             @JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -71,7 +71,7 @@ public class MySqlLoadNode extends LoadNode implements Serializable {
             @JsonProperty("password") String password,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.url = Preconditions.checkNotNull(url, "url is null");
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
index c725c07e1..26ef17ec1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -61,7 +61,7 @@ public class OracleLoadNode extends LoadNode implements Serializable {
             @JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -71,7 +71,7 @@ public class OracleLoadNode extends LoadNode implements Serializable {
             @JsonProperty("password") String password,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.url = Preconditions.checkNotNull(url, "url is null");
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
index 214479e0b..6c8e6d890 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.PostgresConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -73,7 +73,7 @@ public class PostgresLoadNode extends LoadNode implements Serializable {
             @JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -83,7 +83,7 @@ public class PostgresLoadNode extends LoadNode implements Serializable {
             @JsonProperty("password") String password,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.url = Preconditions.checkNotNull(url, "url is null");
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
index 59a91be16..63b35e901 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -70,7 +70,7 @@ public class SqlServerLoadNode extends LoadNode implements Serializable {
     public SqlServerLoadNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -81,7 +81,7 @@ public class SqlServerLoadNode extends LoadNode implements Serializable {
             @JsonProperty(value = "schemaName", defaultValue = "dbo") String schemaName,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.url = Preconditions.checkNotNull(url, "sqlserver url is null");
         this.username = Preconditions.checkNotNull(username, "sqlserver user name is null");
         this.password = Preconditions.checkNotNull(password, "sqlserver password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
index e0864dba1..9432180dd 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.PostgresConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -77,7 +77,7 @@ public class TDSQLPostgresLoadNode extends LoadNode implements Serializable {
             @JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -87,7 +87,7 @@ public class TDSQLPostgresLoadNode extends LoadNode implements Serializable {
             @JsonProperty("password") String password,
             @JsonProperty("tableName") String tableName,
             @JsonProperty("primaryKey") String primaryKey) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+        super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
         this.url = Preconditions.checkNotNull(url, "url is null");
         this.username = Preconditions.checkNotNull(username, "username is null");
         this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
index f1bcfa344..a54886645 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 
@@ -63,7 +63,7 @@ public class DistinctNode extends TransformNode {
      * @param id node id
      * @param name node name
      * @param fields The fields used to describe node schema
-     * @param fieldRelationShips field relationShips ysed to describe node relationships
+     * @param fieldRelations field relations used to describe the relation between fields
      * @param filters The filters used for data filter
      * @param distinctFields The distinct fields used for partition
      * @param orderField the order field used for sorting in partition
@@ -74,13 +74,13 @@ public class DistinctNode extends TransformNode {
     public DistinctNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
             @JsonProperty("distinctFields") List<FieldInfo> distinctFields,
             @JsonProperty("orderField") FieldInfo orderField,
             @JsonProperty("orderDirection") OrderDirection orderDirection) {
-        super(id, name, fields, fieldRelationShips, filters, filterStrategy);
+        super(id, name, fields, fieldRelations, filters, filterStrategy);
         this.distinctFields = Preconditions.checkNotNull(distinctFields, "distinctFields is null");
         Preconditions.checkState(!distinctFields.isEmpty(), "distinct fields is empty");
         this.orderField = Preconditions.checkNotNull(orderField, "orderField is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
index 8cfd272c8..886315b43 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 
 import java.io.Serializable;
@@ -60,8 +60,8 @@ public class TransformNode implements Node, Serializable {
     private String name;
     @JsonProperty("fields")
     private List<FieldInfo> fields;
-    @JsonProperty("fieldRelationShips")
-    private List<FieldRelationShip> fieldRelationShips;
+    @JsonProperty("fieldRelations")
+    private List<FieldRelation> fieldRelations;
     @JsonProperty("filters")
     @JsonInclude(Include.NON_NULL)
     private List<FilterFunction> filters;
@@ -73,16 +73,16 @@ public class TransformNode implements Node, Serializable {
     public TransformNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
             @JsonProperty("fields") List<FieldInfo> fields,
-            @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
             @JsonProperty("filters") List<FilterFunction> filters,
             @JsonProperty("filterStrategy") FilterStrategy filterStrategy) {
         this.id = Preconditions.checkNotNull(id, "id is null");
         this.name = name;
         this.fields = Preconditions.checkNotNull(fields, "fields is null");
         Preconditions.checkState(!fields.isEmpty(), "fields is empty");
-        this.fieldRelationShips = Preconditions.checkNotNull(fieldRelationShips,
-                "fieldRelationShips is null");
-        Preconditions.checkState(!fieldRelationShips.isEmpty(), "fieldRelationShips is empty");
+        this.fieldRelations = Preconditions.checkNotNull(fieldRelations,
+                "fieldRelations is null");
+        Preconditions.checkState(!fieldRelations.isEmpty(), "fieldRelations is empty");
         this.filters = filters;
         this.filterStrategy = filterStrategy;
     }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
similarity index 85%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
index 7ae103342..1955dcb29 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
@@ -27,17 +27,16 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 
 
 /**
- * defines the relationship between fields
- * from input to output field
+ * Defines the relation between fields from input to output field
  */
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
-@JsonTypeName("fieldRelationShip")
+@JsonTypeName("fieldRelation")
 @Data
 @NoArgsConstructor
-public class FieldRelationShip {
+public class FieldRelation {
 
     @JsonProperty("inputField")
     private FunctionParam inputField;
@@ -45,8 +44,8 @@ public class FieldRelationShip {
     private FieldInfo outputField;
 
     @JsonCreator
-    public FieldRelationShip(@JsonProperty("inputField") FunctionParam inputField,
-            @JsonProperty("outputField") FieldInfo outputField) {
+    public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
+                         @JsonProperty("outputField") FieldInfo outputField) {
         this.inputField = inputField;
         this.outputField = outputField;
     }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
similarity index 81%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
index 71a042a2c..a32de63ec 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Full outer join relationship class which defines the full outer join relationship
+ * Full outer join relation class which defines the full outer join relation
  */
 @JsonTypeName("fullOuterJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class FullOuterJoinRelationShip extends JoinRelationShip {
+public class FullOuterJoinRelation extends JoinRelation {
 
     private static final long serialVersionUID = -2551119250767202829L;
 
     /**
-     * FullOuterJoinRelationShip Constructor
+     * FullOuterJoinRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
@@ -49,9 +49,9 @@ public class FullOuterJoinRelationShip extends JoinRelationShip {
      *         the value of joinConditionMap is a list of join contidition
      */
     @JsonCreator
-    public FullOuterJoinRelationShip(@JsonProperty("inputs") List<String> inputs,
-            @JsonProperty("outputs") List<String> outputs,
-            @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+    public FullOuterJoinRelation(@JsonProperty("inputs") List<String> inputs,
+                                 @JsonProperty("outputs") List<String> outputs,
+                                 @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
         super(inputs, outputs, joinConditionMap);
     }
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
similarity index 81%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
index 57c64cbd7..3c9703dc0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Inner join relationship class which defines the inner join relationship
+ * Inner join relation class which defines the inner join relation
  */
 @JsonTypeName("innerJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class InnerJoinNodeRelationShip extends JoinRelationShip {
+public class InnerJoinNodeRelation extends JoinRelation {
 
     private static final long serialVersionUID = -5446480979888656724L;
 
     /**
-     * InnerJoinNodeRelationShip Constructor
+     * InnerJoinNodeRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
@@ -49,9 +49,9 @@ public class InnerJoinNodeRelationShip extends JoinRelationShip {
      *         the value of joinConditionMap is a list of join contidition
      */
     @JsonCreator
-    public InnerJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
-            @JsonProperty("outputs") List<String> outputs,
-            @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+    public InnerJoinNodeRelation(@JsonProperty("inputs") List<String> inputs,
+                                 @JsonProperty("outputs") List<String> outputs,
+                                 @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
         super(inputs, outputs, joinConditionMap);
     }
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
similarity index 75%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index 961175221..0ea75faf7 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -37,15 +37,15 @@ import java.util.Map;
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = FullOuterJoinRelationShip.class, name = "fullOuterJoin"),
-        @JsonSubTypes.Type(value = InnerJoinNodeRelationShip.class, name = "innerJoin"),
-        @JsonSubTypes.Type(value = LeftOuterJoinNodeRelationShip.class, name = "leftOuterJoin"),
-        @JsonSubTypes.Type(value = RightOuterJoinNodeRelationShip.class, name = "rightOutJoin")
+        @JsonSubTypes.Type(value = FullOuterJoinRelation.class, name = "fullOuterJoin"),
+        @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
+        @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
+        @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin")
 })
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public abstract class JoinRelationShip extends NodeRelationShip {
+public abstract class JoinRelation extends NodeRelation {
 
     private static final long serialVersionUID = -213673939512251116L;
 
@@ -53,7 +53,7 @@ public abstract class JoinRelationShip extends NodeRelationShip {
     private Map<String, List<FilterFunction>> joinConditionMap;
 
     /**
-     * JoinRelationShip Constructor
+     * JoinRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
@@ -61,17 +61,17 @@ public abstract class JoinRelationShip extends NodeRelationShip {
      *         the key of joinConditionMap is the node id of join node
      *         the value of joinConditionMap is a list of join contidition
      */
-    public JoinRelationShip(@JsonProperty("inputs") List<String> inputs,
-            @JsonProperty("outputs") List<String> outputs,
-            @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+    public JoinRelation(@JsonProperty("inputs") List<String> inputs,
+                        @JsonProperty("outputs") List<String> outputs,
+                        @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
         super(inputs, outputs);
         this.joinConditionMap = Preconditions.checkNotNull(joinConditionMap, "joinConditionMap is null");
         Preconditions.checkState(!joinConditionMap.isEmpty(), "joinConditionMap is empty");
     }
 
     /**
-     * Node relationship format
-     * that is, the relationship is converted into a string representation of SQL
+     * Node relation format
+     * that is, the relation is converted into a string representation of SQL
      *
      * @return a string representation of SQL
      */
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
index 193f2fc7d..e6a57ef20 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Left outer join relationship class which defines the left outer join relationship
+ * Left outer join relation class which defines the left outer join relation
  */
 @JsonTypeName("leftOuterJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class LeftOuterJoinNodeRelationShip extends JoinRelationShip {
+public class LeftOuterJoinNodeRelation extends JoinRelation {
 
     private static final long serialVersionUID = -2982848817690520421L;
 
     /**
-     * LeftOuterJoinNodeRelationShip Constructor
+     * LeftOuterJoinNodeRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
@@ -49,7 +49,8 @@ public class LeftOuterJoinNodeRelationShip extends JoinRelationShip {
      *         the value of joinConditionMap is a list of join contidition
      */
     @JsonCreator
-    public LeftOuterJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
+    public LeftOuterJoinNodeRelation(
+            @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
         super(inputs, outputs, joinConditionMap);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
similarity index 72%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index a2b0bb1e6..0179e315d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -29,23 +29,23 @@ import java.io.Serializable;
 import java.util.List;
 
 /**
- * Node relationship base class which defines the simplest one-to-one relationship
+ * Node relation base class which defines the simplest one-to-one relation
  */
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = FullOuterJoinRelationShip.class, name = "fullOuterJoin"),
-        @JsonSubTypes.Type(value = InnerJoinNodeRelationShip.class, name = "innerJoin"),
-        @JsonSubTypes.Type(value = LeftOuterJoinNodeRelationShip.class, name = "leftOuterJoin"),
-        @JsonSubTypes.Type(value = RightOuterJoinNodeRelationShip.class, name = "rightOutJoin"),
-        @JsonSubTypes.Type(value = UnionNodeRelationShip.class, name = "union"),
-        @JsonSubTypes.Type(value = NodeRelationShip.class, name = "baseRelation")
+        @JsonSubTypes.Type(value = FullOuterJoinRelation.class, name = "fullOuterJoin"),
+        @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
+        @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
+        @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
+        @JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
+        @JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
 })
 @Data
 @NoArgsConstructor
-public class NodeRelationShip implements Serializable {
+public class NodeRelation implements Serializable {
 
     private static final long serialVersionUID = 5491943876653981952L;
 
@@ -55,14 +55,14 @@ public class NodeRelationShip implements Serializable {
     private List<String> outputs;
 
     /**
-     * NodeRelationShip Constructor
+     * NodeRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
      */
     @JsonCreator
-    public NodeRelationShip(@JsonProperty("inputs") List<String> inputs,
-            @JsonProperty("outputs") List<String> outputs) {
+    public NodeRelation(@JsonProperty("inputs") List<String> inputs,
+                        @JsonProperty("outputs") List<String> outputs) {
         this.inputs = Preconditions.checkNotNull(inputs, "inputs is null");
         Preconditions.checkState(!inputs.isEmpty(), "inputs is empty");
         this.outputs = Preconditions.checkNotNull(outputs, "outputs is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
similarity index 76%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
index 636213f50..4919d3fed 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
@@ -29,27 +29,28 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Right outer join relationship class which defines the right outer join relationship
+ * Right outer join relation class which defines the right outer join relation
  */
 @JsonTypeName("rightOutJoin")
 @EqualsAndHashCode(callSuper = true)
 @Data
 @NoArgsConstructor
-public class RightOuterJoinNodeRelationShip extends JoinRelationShip {
+public class RightOuterJoinNodeRelation extends JoinRelation {
 
     private static final long serialVersionUID = 9202862229428483437L;
 
     /**
-     * RightOuterJoinNodeRelationShip Constructor
+     * RightOuterJoinNodeRelation Constructor
      *
-     * @param inputs The inputs is a list of input node id
-     * @param outputs The outputs is a list of output node id
+     * @param inputs           The inputs is a list of input node id
+     * @param outputs          The outputs is a list of output node id
      * @param joinConditionMap The joinConditionMap is a map of join conditions
-     *         the key of joinConditionMap is the node id of join node
-     *         the value of joinConditionMap is a list of join contidition
+     *                         the key of joinConditionMap is the node id of join node
+     *                         the value of joinConditionMap is a list of join contidition
      */
     @JsonCreator
-    public RightOuterJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
+    public RightOuterJoinNodeRelation(
+            @JsonProperty("inputs") List<String> inputs,
             @JsonProperty("outputs") List<String> outputs,
             @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
         super(inputs, outputs, joinConditionMap);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
similarity index 83%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
index b5cb94a99..a92dbd9de 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
@@ -25,23 +25,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import java.util.List;
 
 /**
- * Union relationship class which defines the union relationship
+ * Union relation class which defines the union relation
  */
 @JsonTypeName("union")
 @EqualsAndHashCode(callSuper = true)
 @Data
-public class UnionNodeRelationShip extends NodeRelationShip {
+public class UnionNodeRelation extends NodeRelation {
 
     private static final long serialVersionUID = 6602357131254518291L;
 
     /**
-     * UnionNodeRelationShip Constructor
+     * UnionNodeRelation Constructor
      *
      * @param inputs The inputs is a list of input node id
      * @param outputs The outputs is a list of output node id
      */
-    public UnionNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
-            @JsonProperty("outputs") List<String> outputs) {
+    public UnionNodeRelation(@JsonProperty("inputs") List<String> inputs,
+                             @JsonProperty("outputs") List<String> outputs) {
         super(inputs, outputs);
     }
 
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index 26fff58d0..e2ed364ec 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -41,7 +41,7 @@ public class FieldInfoTest extends SerializeBaseTest<FieldInfo> {
     @Test
     public void testDeserializeWithNodeId() throws JsonProcessingException {
         FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
-        fieldInfo.setNodeId("1L");
+        fieldInfo.setNodeId("1");
         ObjectMapper objectMapper = new ObjectMapper();
         String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
                 + "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 38684674f..777acacd5 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -27,12 +27,12 @@ import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,14 +65,14 @@ public class GroupInfoTest extends SerializeBaseTest<GroupInfo> {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
@@ -80,10 +80,10 @@ public class GroupInfoTest extends SerializeBaseTest<GroupInfo> {
                 1, null, "id");
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     @Override
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
similarity index 72%
copy from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
index 0f920d533..fdf71f370 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
@@ -15,19 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.protocol.transformation.relation;
+package org.apache.inlong.sort.protocol;
 
 import org.apache.inlong.sort.SerializeBaseTest;
 
-import java.util.Arrays;
-
 /**
- * Tests for {@link NodeRelationShip}
+ * Test for {@link MetaFieldInfo}
  */
-public class NodeRelationTest extends SerializeBaseTest<NodeRelationShip> {
+public class MetaFieldInfoTest extends SerializeBaseTest<MetaFieldInfo> {
 
     @Override
-    public NodeRelationShip getTestObject() {
-        return new NodeRelationShip(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
+    public MetaFieldInfo getTestObject() {
+        return new MetaFieldInfo("f1", MetaFieldInfo.MetaField.DATABASE_NAME);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index f9f8cf3e7..45a09c5e0 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -28,12 +28,12 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -66,14 +66,14 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
@@ -87,14 +87,14 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new HiveLoadNode("2", "hive_output", fields, relations, null, null,
@@ -102,10 +102,10 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
                 null, Arrays.asList(new FieldInfo("day", new LongFormatInfo())));
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
index f56a02650..dca17a878 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
@@ -35,7 +35,7 @@ public class ClickHouseLoadNodeTest extends SerializeBaseTest<Node> {
 
         return new ClickHouseLoadNode("2", "test_clickhouse",
                 Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
                         new FieldInfo("id", new StringFormatInfo()))),
                 null,
                 null,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
index 483d26757..9775ac297 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
@@ -31,7 +31,7 @@ public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
     public HbaseLoadNode getTestObject() {
         return new HbaseLoadNode("2", "test_hbase",
                 Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
                         new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
                 "localhost:2181", "MD5(`id`)", null, null, null, null);
     }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
index 012e3dd05..1bd5aef17 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -35,7 +35,7 @@ public class HiveLoadNodeTest extends SerializeBaseTest<HiveLoadNode> {
     public HiveLoadNode getTestObject() {
         return new HiveLoadNode("1", "test_hive_node",
                 Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
                         new FieldInfo("field", new StringFormatInfo()))), null, null,
                 1, new HashMap<>(), "myHive", "default",
                 "test", "/opt/hive-conf", "3.1.2",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index 0ff7316c7..3e93fc9f3 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 import java.util.TreeMap;
@@ -35,7 +35,7 @@ public class KafkaLoadNodeTest extends SerializeBaseTest<KafkaLoadNode> {
     public KafkaLoadNode getTestObject() {
         return new KafkaLoadNode("1", null,
                 Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
                         new FieldInfo("field", new StringFormatInfo()))), null, null,
                 "topic", "localhost:9092", new CanalJsonFormat(),
                 1, new TreeMap<>(), null);
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
index 83e47a8f9..d5ff93a25 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Collections;
 
@@ -34,7 +34,7 @@ public class MySqlLoadNodeTest extends SerializeBaseTest<MySqlLoadNode> {
     public MySqlLoadNode getTestObject() {
         return new MySqlLoadNode("1", "mysql_output",
                 Collections.singletonList(new FieldInfo("name", new StringFormatInfo())),
-                Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+                Collections.singletonList(new FieldRelation(new FieldInfo("name",
                         new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo()))),
                 null, null, 1, null,
                 "jdbc:mysql://localhost:3306/inlong",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
index ca972fe23..c9e73a63d 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Collections;
 
@@ -34,7 +34,7 @@ public class OracleLoadNodeTest extends SerializeBaseTest<OracleLoadNode> {
     public OracleLoadNode getTestObject() {
         return new OracleLoadNode("1", "mysql_output",
                 Collections.singletonList(new FieldInfo("NAME", new StringFormatInfo())),
-                Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+                Collections.singletonList(new FieldRelation(new FieldInfo("name",
                         new StringFormatInfo()), new FieldInfo("NAME", new StringFormatInfo()))),
                 null, null, 1, null,
                 "jdbc:oracle:thin:@localhost:1521:xe",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
index 8e01fdf5e..e854bffdc 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
@@ -39,7 +39,7 @@ public class PostgresLoadNodeTest extends SerializeBaseTest<PostgresLoadNode> {
     public PostgresLoadNode getTestObject() {
         return new PostgresLoadNode("1", "postgres_output", Arrays.asList(new FieldInfo("name",
                 new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo()))), null, null, 1, null,
                 "jdbc:postgresql://localhost:5432/postgres",
                 "postgres",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
index e99328cf4..75f529671 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 import java.util.List;
@@ -36,10 +36,10 @@ public class SqlServerLoadNodeTest extends SerializeBaseTest<SqlServerLoadNode>
     public SqlServerLoadNode getTestObject() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo()))
                 );
         return new SqlServerLoadNode("1", "sqlserver_out", fields, relations, null, null, 1,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
index b6a26319b..fe2d9c72f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
 import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.Arrays;
 
@@ -39,7 +39,7 @@ public class TDSQLPostgresLoadNodeTest extends SerializeBaseTest<TDSQLPostgresLo
     public TDSQLPostgresLoadNode getTestObject() {
         return new TDSQLPostgresLoadNode("1", "tdsqlPostgres_output", Arrays.asList(new FieldInfo("name",
                 new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo()))), null, null, 1, null,
                 "jdbc:postgresql://localhost:5432/postgres",
                 "postgres",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
index 7f308271c..bc9c2cb22 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 
 import java.util.Arrays;
@@ -39,13 +39,13 @@ public class DistinctNodeTest extends SerializeBaseTest<DistinctNode> {
                         new FieldInfo("f3", new StringFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
                 Arrays.asList(
-                        new FieldRelationShip(new FieldInfo("f1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("f1", new StringFormatInfo()),
                                 new FieldInfo("f1", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("f2", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("f2", new StringFormatInfo()),
                                 new FieldInfo("f2", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("f3", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("f3", new StringFormatInfo()),
                                 new FieldInfo("f3", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new StringFormatInfo()),
                                 new FieldInfo("ts", new StringFormatInfo()))
                 ),
                 null, null,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
similarity index 82%
rename from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java
rename to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
index 8290bf429..fb4db7fca 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
@@ -22,9 +22,9 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 
 /**
- * Test for {@link FieldRelationShip}
+ * Test for {@link FieldRelation}
  */
-public class FieldRelationShipTest extends SerializeBaseTest<FieldRelationShip> {
+public class FieldRelationTest extends SerializeBaseTest<FieldRelation> {
 
     /**
      * Get test object
@@ -32,8 +32,8 @@ public class FieldRelationShipTest extends SerializeBaseTest<FieldRelationShip>
      * @return The test object
      */
     @Override
-    public FieldRelationShip getTestObject() {
-        return new FieldRelationShip(new FieldInfo("f", StringFormatInfo.INSTANCE),
+    public FieldRelation getTestObject() {
+        return new FieldRelation(new FieldInfo("f", StringFormatInfo.INSTANCE),
                 new FieldInfo("f", StringFormatInfo.INSTANCE));
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
index 5f6b85b80..67e54d4c9 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * Tests for {@link FullOuterJoinRelationShip}
+ * Tests for {@link FullOuterJoinRelation}
  */
-public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJoinRelationShip> {
+public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJoinRelation> {
 
     @Override
-    public FullOuterJoinRelationShip getTestObject() {
+    public FullOuterJoinRelation getTestObject() {
         Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
         joinConditionMap.put("2", Arrays.asList(
                 new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJo
                 new SingleValueFilterFunction(AndOperator.getInstance(),
                         new FieldInfo("name", "3", new StringFormatInfo()),
                         NotEqualOperator.getInstance(), new ConstantParam("test"))));
-        return new FullOuterJoinRelationShip(Arrays.asList("1", "2", "3"),
+        return new FullOuterJoinRelation(Arrays.asList("1", "2", "3"),
                 Collections.singletonList("4"), joinConditionMap);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
index 0e15fe440..aef4c07f8 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * Tests for {@link InnerJoinNodeRelationShip}
+ * Tests for {@link InnerJoinNodeRelation}
  */
-public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRelationShip> {
+public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRelation> {
 
     @Override
-    public InnerJoinNodeRelationShip getTestObject() {
+    public InnerJoinNodeRelation getTestObject() {
         Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
         joinConditionMap.put("2", Arrays.asList(
                 new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRe
                 new SingleValueFilterFunction(AndOperator.getInstance(),
                         new FieldInfo("name", "3", new StringFormatInfo()),
                         NotEqualOperator.getInstance(), new ConstantParam("test"))));
-        return new InnerJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+        return new InnerJoinNodeRelation(Arrays.asList("1", "2", "3"),
                 Collections.singletonList("4"), joinConditionMap);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
index c52760e49..2cc6adbaa 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * Tests for {@link LeftOuterJoinNodeRelationShip}
+ * Tests for {@link LeftOuterJoinNodeRelation}
  */
-public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJoinNodeRelationShip> {
+public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJoinNodeRelation> {
 
     @Override
-    public LeftOuterJoinNodeRelationShip getTestObject() {
+    public LeftOuterJoinNodeRelation getTestObject() {
         Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
         joinConditionMap.put("2", Arrays.asList(
                 new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJo
                 new SingleValueFilterFunction(AndOperator.getInstance(),
                         new FieldInfo("name", "3", new StringFormatInfo()),
                         NotEqualOperator.getInstance(), new ConstantParam("test"))));
-        return new LeftOuterJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+        return new LeftOuterJoinNodeRelation(Arrays.asList("1", "2", "3"),
                 Collections.singletonList("4"), joinConditionMap);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
index 0f920d533..3827ec9ae 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
@@ -22,12 +22,12 @@ import org.apache.inlong.sort.SerializeBaseTest;
 import java.util.Arrays;
 
 /**
- * Tests for {@link NodeRelationShip}
+ * Tests for {@link NodeRelation}
  */
-public class NodeRelationTest extends SerializeBaseTest<NodeRelationShip> {
+public class NodeRelationTest extends SerializeBaseTest<NodeRelation> {
 
     @Override
-    public NodeRelationShip getTestObject() {
-        return new NodeRelationShip(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
+    public NodeRelation getTestObject() {
+        return new NodeRelation(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
index fb4bb0647..fb57d629f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
@@ -34,12 +34,12 @@ import java.util.Map;
 import java.util.TreeMap;
 
 /**
- * Tests for {@link RightOuterJoinNodeRelationShip}
+ * Tests for {@link RightOuterJoinNodeRelation}
  */
-public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuterJoinNodeRelationShip> {
+public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuterJoinNodeRelation> {
 
     @Override
-    public RightOuterJoinNodeRelationShip getTestObject() {
+    public RightOuterJoinNodeRelation getTestObject() {
         Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
         joinConditionMap.put("2", Arrays.asList(
                 new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -57,7 +57,7 @@ public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuter
                 new SingleValueFilterFunction(AndOperator.getInstance(),
                         new FieldInfo("name", "3", new StringFormatInfo()),
                         NotEqualOperator.getInstance(), new ConstantParam("test"))));
-        return new RightOuterJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+        return new RightOuterJoinNodeRelation(Arrays.asList("1", "2", "3"),
                 Arrays.asList("4"), joinConditionMap);
     }
 }
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
index 1849bcc3b..e3de33a6e 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
@@ -23,13 +23,13 @@ import java.util.Arrays;
 import java.util.Collections;
 
 /**
- * Tests for {@link UnionNodeRelationShip}
+ * Tests for {@link UnionNodeRelation}
  */
-public class UnionNodeRelationTest extends SerializeBaseTest<UnionNodeRelationShip> {
+public class UnionNodeRelationTest extends SerializeBaseTest<UnionNodeRelation> {
 
     @Override
-    public UnionNodeRelationShip getTestObject() {
-        return new UnionNodeRelationShip(Arrays.asList("1", "2"),
+    public UnionNodeRelation getTestObject() {
+        return new UnionNodeRelation(Arrays.asList("1", "2"),
                 Collections.singletonList("3"));
     }
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 396b4cc90..8ef086c6e 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -29,6 +29,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -41,13 +42,13 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.Function;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
-import org.apache.inlong.sort.protocol.transformation.relation.JoinRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,7 +132,7 @@ public class FlinkSqlParser implements Parser {
     /**
      * Parse stream
      *
-     * @param streamInfo The encapsulation of nodes and node relationships
+     * @param streamInfo The encapsulation of nodes and node relations
      */
     private void parseStream(StreamInfo streamInfo) {
         Preconditions.checkNotNull(streamInfo, "stream is null");
@@ -146,7 +147,7 @@ public class FlinkSqlParser implements Parser {
             Preconditions.checkNotNull(s.getId(), "node id is null");
             nodeMap.put(s.getId(), s);
         });
-        Map<String, NodeRelationShip> relationMap = new HashMap<String, NodeRelationShip>();
+        Map<String, NodeRelation> relationMap = new HashMap<String, NodeRelation>();
         streamInfo.getRelations().forEach(r -> {
             for (String output : r.getOutputs()) {
                 relationMap.put(output, r);
@@ -161,15 +162,15 @@ public class FlinkSqlParser implements Parser {
     /**
      * parse node relation
      * <p>
-     * Here we only parse the output node in the relationship,
+     * Here we only parse the output node in the relation,
      * and the input node parsing is achieved by parsing the dependent node parsing of the output node.
      *
-     * @param relation    Define relationships between nodes, it also shows the data flow
-     * @param nodeMap     Store the mapping relationship between node id and node
-     * @param relationMap Store the mapping relationship between node id and relation
+     * @param relation    Define relations between nodes, it also shows the data flow
+     * @param nodeMap     Store the mapping relation between node id and node
+     * @param relationMap Store the mapping relation between node id and relation
      */
-    private void parseNodeRelation(NodeRelationShip relation, Map<String, Node> nodeMap,
-                                   Map<String, NodeRelationShip> relationMap) {
+    private void parseNodeRelation(NodeRelation relation, Map<String, Node> nodeMap,
+                                   Map<String, NodeRelation> relationMap) {
         log.info("start parse node relation, relation:{}", relation);
         Preconditions.checkNotNull(relation, "relation is null");
         Preconditions.checkState(relation.getInputs().size() > 0,
@@ -201,12 +202,12 @@ public class FlinkSqlParser implements Parser {
      * Parse a node and recursively resolve its dependent nodes
      *
      * @param node        The abstract of extract, transform, load
-     * @param relation    Define relationships between nodes, it also shows the data flow
-     * @param nodeMap     store the mapping relationship between node id and node
-     * @param relationMap Store the mapping relationship between node id and relation
+     * @param relation    Define relations between nodes, it also shows the data flow
+     * @param nodeMap     store the mapping relation between node id and node
+     * @param relationMap Store the mapping relation between node id and relation
      */
-    private void parseNode(Node node, NodeRelationShip relation, Map<String, Node> nodeMap,
-                           Map<String, NodeRelationShip> relationMap) {
+    private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeMap,
+                           Map<String, NodeRelation> relationMap) {
         if (hasParsedSet.contains(node.getId())) {
             log.warn("the node has already been parsed, node id:{}", node.getId());
             return;
@@ -240,31 +241,31 @@ public class FlinkSqlParser implements Parser {
                 hasParsedSet.add(node.getId());
             } else if (node instanceof TransformNode) {
                 TransformNode transformNode = (TransformNode) node;
-                Preconditions.checkNotNull(transformNode.getFieldRelationShips(),
+                Preconditions.checkNotNull(transformNode.getFieldRelations(),
                         "field relations is null");
-                Preconditions.checkState(!transformNode.getFieldRelationShips().isEmpty(),
+                Preconditions.checkState(!transformNode.getFieldRelations().isEmpty(),
                         "field relations is empty");
                 String createSql = genCreateSql(node);
                 log.info("node id:{}, create table sql:\n{}", node.getId(), createSql);
                 String selectSql;
-                if (relation instanceof JoinRelationShip) {
-                    // parse join relation ship and generate the transform sql
+                if (relation instanceof JoinRelation) {
+                    // parse join relation and generate the transform sql
                     Preconditions.checkState(relation.getInputs().size() > 1,
                             "join must have more than one input nodes");
                     Preconditions.checkState(relation.getOutputs().size() == 1,
                             "join node only support one output node");
-                    JoinRelationShip joinRelation = (JoinRelationShip) relation;
+                    JoinRelation joinRelation = (JoinRelation) relation;
                     selectSql = genJoinSelectSql(transformNode, joinRelation, nodeMap);
-                } else if (relation instanceof UnionNodeRelationShip) {
-                    // parse union relation ship and generate the transform sql
+                } else if (relation instanceof UnionNodeRelation) {
+                    // parse union relation and generate the transform sql
                     Preconditions.checkState(relation.getInputs().size() > 1,
                             "union must have more than one input nodes");
                     Preconditions.checkState(relation.getOutputs().size() == 1,
                             "join node only support one output node");
-                    UnionNodeRelationShip unionRelation = (UnionNodeRelationShip) relation;
+                    UnionNodeRelation unionRelation = (UnionNodeRelation) relation;
                     selectSql = genUnionNodeSelectSql(transformNode, unionRelation, nodeMap);
                 } else {
-                    // parse base relation ship that one to one and generate the transform sql
+                    // parse base relation that one to one and generate the transform sql
                     Preconditions.checkState(relation.getInputs().size() == 1,
                             "simple transform only support one input node");
                     Preconditions.checkState(relation.getOutputs().size() == 1,
@@ -284,16 +285,16 @@ public class FlinkSqlParser implements Parser {
      *
      * @param transformNode The transform node
      * @param unionRelation The union relation of sql
-     * @param nodeMap       Store the mapping relationship between node id and node
+     * @param nodeMap       Store the mapping relation between node id and node
      * @return Transform sql for this transform logic
      */
     private String genUnionNodeSelectSql(TransformNode transformNode,
-                                         UnionNodeRelationShip unionRelation, Map<String, Node> nodeMap) {
+                                         UnionNodeRelation unionRelation, Map<String, Node> nodeMap) {
         throw new UnsupportedOperationException("Union is not currently supported");
     }
 
     private String genJoinSelectSql(TransformNode node,
-                                    JoinRelationShip relation, Map<String, Node> nodeMap) {
+                                    JoinRelation relation, Map<String, Node> nodeMap) {
         // Get tablename alias map by input nodes
         Map<String, String> tableNameAliasMap = new HashMap<>(relation.getInputs().size());
         relation.getInputs().forEach(s -> {
@@ -303,9 +304,9 @@ public class FlinkSqlParser implements Parser {
         });
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ");
-        Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(node.getFieldRelationShips().size());
-        // Generate mapping for output field to FieldRelationShip
-        node.getFieldRelationShips().forEach(s -> {
+        Map<String, FieldRelation> fieldRelationMap = new HashMap<>(node.getFieldRelations().size());
+        // Generate mapping for output field to FieldRelation
+        node.getFieldRelations().forEach(s -> {
             fillOutTableNameAlias(Collections.singletonList(s.getInputField()), tableNameAliasMap);
             fieldRelationMap.put(s.getOutputField().getName(), s);
         });
@@ -355,7 +356,7 @@ public class FlinkSqlParser implements Parser {
      *
      * @param params            The params used in filter, join condition, transform function etc.
      * @param tableNameAliasMap The tablename alias map,
-     *                          contains all tablename alias used in this relationship of nodes
+     *                          contains all tablename alias used in this relation of nodes
      */
     private void fillOutTableNameAlias(List<FunctionParam> params, Map<String, String> tableNameAliasMap) {
         for (FunctionParam param : params) {
@@ -416,16 +417,16 @@ public class FlinkSqlParser implements Parser {
      * Generate the most basic conversion sql one-to-one
      *
      * @param node     The transform node
-     * @param relation Define relationships between nodes, it also shows the data flow
-     * @param nodeMap  Store the mapping relationship between node id and node
+     * @param relation Define relations between nodes, it also shows the data flow
+     * @param nodeMap  Store the mapping relation between node id and node
      * @return Transform sql for this transform logic
      */
     private String genSimpleTransformSelectSql(TransformNode node,
-                                               NodeRelationShip relation, Map<String, Node> nodeMap) {
+                                               NodeRelation relation, Map<String, Node> nodeMap) {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ");
-        Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(node.getFieldRelationShips().size());
-        node.getFieldRelationShips().forEach(s -> {
+        Map<String, FieldRelation> fieldRelationMap = new HashMap<>(node.getFieldRelations().size());
+        node.getFieldRelations().forEach(s -> {
             fieldRelationMap.put(s.getOutputField().getName(), s);
         });
         parseFieldRelations(node.getFields(), fieldRelationMap, sb);
@@ -468,9 +469,9 @@ public class FlinkSqlParser implements Parser {
      * @param sb               Container for storing sql
      */
     private void parseFieldRelations(List<FieldInfo> fields,
-                                     Map<String, FieldRelationShip> fieldRelationMap, StringBuilder sb) {
+                                     Map<String, FieldRelation> fieldRelationMap, StringBuilder sb) {
         for (FieldInfo field : fields) {
-            FieldRelationShip fieldRelation = fieldRelationMap.get(field.getName());
+            FieldRelation fieldRelation = fieldRelationMap.get(field.getName());
             if (fieldRelation != null) {
                 sb.append("\n    ").append(fieldRelation.getInputField().format())
                         .append(" AS ").append(field.format()).append(",");
@@ -490,8 +491,8 @@ public class FlinkSqlParser implements Parser {
      * @return Insert sql
      */
     private String genLoadNodeInsertSql(LoadNode loadNode, Node inputNode) {
-        Preconditions.checkNotNull(loadNode.getFieldRelationShips(), "field relations is null");
-        Preconditions.checkState(!loadNode.getFieldRelationShips().isEmpty(),
+        Preconditions.checkNotNull(loadNode.getFieldRelations(), "field relations is null");
+        Preconditions.checkState(!loadNode.getFieldRelations().isEmpty(),
                 "field relations is empty");
         StringBuilder sb = new StringBuilder();
         sb.append("INSERT INTO `").append(loadNode.genTableName()).append("` ");
@@ -499,8 +500,8 @@ public class FlinkSqlParser implements Parser {
         if (loadNode instanceof HbaseLoadNode) {
             parseHbaseLoadFieldRelation((HbaseLoadNode) loadNode, sb);
         } else {
-            Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
-            loadNode.getFieldRelationShips().forEach(s -> {
+            Map<String, FieldRelation> fieldRelationMap = new HashMap<>(loadNode.getFieldRelations().size());
+            loadNode.getFieldRelations().forEach(s -> {
                 fieldRelationMap.put(s.getOutputField().getName(), s);
             });
             parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
@@ -512,13 +513,13 @@ public class FlinkSqlParser implements Parser {
 
     private void parseHbaseLoadFieldRelation(HbaseLoadNode hbaseLoadNode, StringBuilder sb) {
         sb.append(hbaseLoadNode.getRowKey()).append(" as rowkey,\n");
-        List<FieldRelationShip> fieldRelationShips = hbaseLoadNode.getFieldRelationShips();
-        Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
-                fieldRelationShips);
-        for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+        List<FieldRelation> fieldRelations = hbaseLoadNode.getFieldRelations();
+        Map<String, List<FieldRelation>> columnFamilyMapFields = genColumnFamilyMapFieldRelations(
+                fieldRelations);
+        for (Map.Entry<String, List<FieldRelation>> entry : columnFamilyMapFields.entrySet()) {
             StringBuilder fieldAppend = new StringBuilder(" ROW(");
-            for (FieldRelationShip fieldRelationShip : entry.getValue()) {
-                FieldInfo fieldInfo = (FieldInfo) fieldRelationShip.getInputField();
+            for (FieldRelation fieldRelation : entry.getValue()) {
+                FieldInfo fieldInfo = (FieldInfo) fieldRelation.getInputField();
                 fieldAppend.append(fieldInfo.getName()).append(",");
             }
             if (fieldAppend.length() > 0) {
@@ -572,14 +573,14 @@ public class FlinkSqlParser implements Parser {
         StringBuilder sb = new StringBuilder("CREATE TABLE `");
         sb.append(node.genTableName()).append("`(\n");
         sb.append("rowkey STRING,\n");
-        List<FieldRelationShip> fieldRelationShips = node.getFieldRelationShips();
-        Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
-                fieldRelationShips);
-        for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+        List<FieldRelation> fieldRelations = node.getFieldRelations();
+        Map<String, List<FieldRelation>> columnFamilyMapFields = genColumnFamilyMapFieldRelations(
+                fieldRelations);
+        for (Map.Entry<String, List<FieldRelation>> entry : columnFamilyMapFields.entrySet()) {
             sb.append(entry.getKey());
             StringBuilder fieldsAppend = new StringBuilder(" Row<");
-            for (FieldRelationShip fieldRelationShip : entry.getValue()) {
-                FieldInfo fieldInfo = fieldRelationShip.getOutputField();
+            for (FieldRelation fieldRelation : entry.getValue()) {
+                FieldInfo fieldInfo = fieldRelation.getOutputField();
                 fieldsAppend.append(fieldInfo.getName().split(":")[1]).append(" ")
                         .append(TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()).asSummaryString())
                         .append(",");
@@ -595,13 +596,13 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
-    private Map<String, List<FieldRelationShip>> genColumnFamilyMapFieldRelationShips(
-            List<FieldRelationShip> fieldRelationShips) {
-        Map<String, List<FieldRelationShip>> columnFamilyMapFields = new HashMap<>(16);
-        for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
-            String columnFamily = fieldRelationShip.getOutputField().getName().split(":")[0];
+    private Map<String, List<FieldRelation>> genColumnFamilyMapFieldRelations(
+            List<FieldRelation> fieldRelations) {
+        Map<String, List<FieldRelation>> columnFamilyMapFields = new HashMap<>(16);
+        for (FieldRelation fieldRelation : fieldRelations) {
+            String columnFamily = fieldRelation.getOutputField().getName().split(":")[0];
             columnFamilyMapFields.computeIfAbsent(columnFamily, v -> new ArrayList<>())
-                    .add(fieldRelationShip);
+                    .add(fieldRelation);
         }
         return columnFamilyMapFields;
     }
@@ -666,7 +667,10 @@ public class FlinkSqlParser implements Parser {
         StringBuilder sb = new StringBuilder();
         for (FieldInfo field : fields) {
             sb.append("    `").append(field.getName()).append("` ");
-            if (field instanceof BuiltInFieldInfo) {
+            if (field instanceof MetaFieldInfo) {
+                MetaFieldInfo metaFieldInfo = (MetaFieldInfo) field;
+                parseMetaField(node, metaFieldInfo, sb);
+            } else if (field instanceof BuiltInFieldInfo) {
                 BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) field;
                 parseMetaField(node, builtInFieldInfo, sb);
             } else {
@@ -680,6 +684,7 @@ public class FlinkSqlParser implements Parser {
         return sb.toString();
     }
 
+    @Deprecated
     private void parseMetaField(Node node, BuiltInFieldInfo metaField, StringBuilder sb) {
         if (metaField.getBuiltInField() == BuiltInField.PROCESS_TIME) {
             sb.append(" AS PROCTIME()");
@@ -700,6 +705,27 @@ public class FlinkSqlParser implements Parser {
         }
     }
 
+    private void parseMetaField(Node node, MetaFieldInfo metaFieldInfo, StringBuilder sb) {
+        if (metaFieldInfo.getMetaField() == MetaFieldInfo.MetaField.PROCESS_TIME) {
+            sb.append(" AS PROCTIME()");
+            return;
+        }
+        if (node instanceof MySqlExtractNode) {
+            sb.append(parseMySqlExtractNodeMetaField(metaFieldInfo));
+        } else if (node instanceof OracleExtractNode) {
+            sb.append(parseOracleExtractNodeMetaField(metaFieldInfo));
+        } else if (node instanceof KafkaExtractNode) {
+            sb.append(parseKafkaExtractNodeMetaField(metaFieldInfo));
+        } else if (node instanceof KafkaLoadNode) {
+            sb.append(parseKafkaLoadNodeMetaField(metaFieldInfo));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("This node:%s does not currently support metadata fields",
+                            node.getClass().getName()));
+        }
+    }
+
+    @Deprecated
     private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
         String metaType;
         switch (metaField.getBuiltInField()) {
@@ -745,11 +771,59 @@ public class FlinkSqlParser implements Parser {
                 metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
                 break;
             default:
-                metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaField.getBuiltInField()));
+        }
+        return metaType;
+    }
+
+    private String parseKafkaLoadNodeMetaField(MetaFieldInfo metaFieldInfo) {
+        String metaType;
+        switch (metaFieldInfo.getMetaField()) {
+            case TABLE_NAME:
+                metaType = "STRING METADATA FROM 'value.table'";
+                break;
+            case DATABASE_NAME:
+                metaType = "STRING METADATA FROM 'value.database'";
+                break;
+            case OP_TS:
+                metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
+                break;
+            case OP_TYPE:
+                metaType = "STRING METADATA FROM 'value.op-type'";
+                break;
+            case DATA:
+                metaType = "STRING METADATA FROM 'value.data'";
+                break;
+            case IS_DDL:
+                metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
+                break;
+            case TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
+                break;
+            case SQL_TYPE:
+                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
+                break;
+            case MYSQL_TYPE:
+                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
+                break;
+            case PK_NAMES:
+                metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
+                break;
+            case BATCH_ID:
+                metaType = "BIGINT METADATA FROM 'value.batch-id'";
+                break;
+            case UPDATE_BEFORE:
+                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaFieldInfo.getMetaField()));
         }
         return metaType;
     }
 
+    @Deprecated
     private String parseKafkaExtractNodeMetaField(BuiltInFieldInfo metaField) {
         String metaType;
         switch (metaField.getBuiltInField()) {
@@ -793,7 +867,98 @@ public class FlinkSqlParser implements Parser {
                 metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
                 break;
             default:
-                metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaField.getBuiltInField()));
+        }
+        return metaType;
+    }
+
+    private String parseKafkaExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+        String metaType;
+        switch (metaFieldInfo.getMetaField()) {
+            case TABLE_NAME:
+                metaType = "STRING METADATA FROM 'value.table'";
+                break;
+            case DATABASE_NAME:
+                metaType = "STRING METADATA FROM 'value.database'";
+                break;
+            case SQL_TYPE:
+                metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
+                break;
+            case PK_NAMES:
+                metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
+                break;
+            case TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
+                break;
+            case OP_TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
+                break;
+            // additional metadata
+            case OP_TYPE:
+                metaType = "STRING METADATA FROM 'value.op-type'";
+                break;
+            case IS_DDL:
+                metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
+                break;
+            case MYSQL_TYPE:
+                metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
+                break;
+            case BATCH_ID:
+                metaType = "BIGINT METADATA FROM 'value.batch-id'";
+                break;
+            case UPDATE_BEFORE:
+                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaFieldInfo.getMetaField()));
+        }
+        return metaType;
+    }
+
+    private String parseMySqlExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+        String metaType;
+        switch (metaFieldInfo.getMetaField()) {
+            case TABLE_NAME:
+                metaType = "STRING METADATA FROM 'meta.table_name' VIRTUAL";
+                break;
+            case DATABASE_NAME:
+                metaType = "STRING METADATA FROM 'meta.database_name' VIRTUAL";
+                break;
+            case OP_TS:
+                metaType = "TIMESTAMP(3) METADATA FROM 'meta.op_ts' VIRTUAL";
+                break;
+            case OP_TYPE:
+                metaType = "STRING METADATA FROM 'meta.op_type' VIRTUAL";
+                break;
+            case DATA:
+                metaType = "STRING METADATA FROM 'meta.data' VIRTUAL";
+                break;
+            case IS_DDL:
+                metaType = "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL";
+                break;
+            case TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL";
+                break;
+            case SQL_TYPE:
+                metaType = "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL";
+                break;
+            case MYSQL_TYPE:
+                metaType = "MAP<STRING, STRING> METADATA FROM 'meta.mysql_type' VIRTUAL";
+                break;
+            case PK_NAMES:
+                metaType = "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL";
+                break;
+            case BATCH_ID:
+                metaType = "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL";
+                break;
+            case UPDATE_BEFORE:
+                metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaFieldInfo.getMetaField()));
         }
         return metaType;
     }
@@ -843,11 +1008,13 @@ public class FlinkSqlParser implements Parser {
                 metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
                 break;
             default:
-                metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaField.getBuiltInField()));
         }
         return metaType;
     }
 
+    @Deprecated
     private String parseOracleExtractNodeMetaField(BuiltInFieldInfo metaField) {
         String metaType;
         switch (metaField.getBuiltInField()) {
@@ -864,7 +1031,30 @@ public class FlinkSqlParser implements Parser {
                 metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
                 break;
             default:
-                metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaField.getBuiltInField()));
+        }
+        return metaType;
+    }
+
+    private String parseOracleExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+        String metaType;
+        switch (metaFieldInfo.getMetaField()) {
+            case TABLE_NAME:
+                metaType = "STRING METADATA FROM 'table_name' VIRTUAL";
+                break;
+            case SCHEMA_NAME:
+                metaType = "STRING METADATA FROM 'schema_name' VIRTUAL";
+                break;
+            case DATABASE_NAME:
+                metaType = "STRING METADATA FROM 'database_name' VIRTUAL";
+                break;
+            case OP_TS:
+                metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
+                break;
+            default:
+                throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+                        metaFieldInfo.getMetaField()));
         }
         return metaType;
     }
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 7fb4d6fe4..3a6057517 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -31,8 +31,8 @@ import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,8 +62,8 @@ public class AllMigrateTest {
 
     private KafkaLoadNode buildAllMigrateKafkaNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("data", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("data", new StringFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("data", new StringFormatInfo()),
                         new FieldInfo("data", new StringFormatInfo())));
         CsvFormat csvFormat = new CsvFormat();
         csvFormat.setDisableQuoteCharacter(true);
@@ -73,10 +73,10 @@ public class AllMigrateTest {
                 null, null);
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index f0b5c31d2..5c0c9893e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,10 +63,10 @@ public class ClickHouseSqlParserTest {
     private ClickHouseLoadNode buildClickHouseLoadNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())));
 
         return new ClickHouseLoadNode(id, "test_clickhouse",
@@ -90,10 +90,10 @@ public class ClickHouseSqlParserTest {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 5f1192922..bc538e3dd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -39,13 +39,13 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -105,14 +105,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -127,14 +127,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -149,14 +149,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -174,13 +174,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ),
                 Arrays.asList(
-                        new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                        new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 ),
                 null, null,
@@ -198,13 +198,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ),
                 Arrays.asList(
-                        new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                        new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 ),
                 null, null,
@@ -222,13 +222,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ),
                 Arrays.asList(
-                        new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+                        new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 ),
                 null, null,
@@ -237,10 +237,10 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 OrderDirection.ASC);
     }
 
-    public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
index ef4a562ba..59f2394c3 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
@@ -37,14 +37,14 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -78,14 +78,14 @@ public class FilterParseTest extends AbstractTestBase {
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo())
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         List<FilterFunction> filters = Arrays.asList(
@@ -102,10 +102,10 @@ public class FilterParseTest extends AbstractTestBase {
                 null, "id");
     }
 
-    public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index b81a54c92..586488a80 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -39,12 +39,12 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
 import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -96,14 +96,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
@@ -112,10 +112,10 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 null, null);
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     private HiveLoadNode buildHiveNode(String id) {
@@ -124,14 +124,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new HiveLoadNode(id, "hive_output",
@@ -147,14 +147,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
         return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index cd629ee6c..ae3ae8b68 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.FullOuterJoinRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.FullOuterJoinRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -61,7 +61,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
- * Test for {@link FullOuterJoinRelationShip}
+ * Test for {@link FullOuterJoinRelation}
  */
 public class FullOuterJoinSqlParseTest extends AbstractTestBase {
 
@@ -118,16 +118,16 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                                 new FieldInfo("salary", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -149,15 +149,15 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo()))
         ), null, null);
     }
@@ -183,17 +183,17 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                         new FieldInfo("salary", new TimestampFormatInfo()))
         ), filters, null,
                 Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -205,12 +205,12 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A NodeRelationShip
+     * @return A NodeRelation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
@@ -218,9 +218,9 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A FullOuterJoinRelationShip
+     * @return A FullOuterJoinRelation
      */
-    private NodeRelationShip buildFullOuterJoinRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildFullOuterJoinRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
         Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -230,7 +230,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
         conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
                 new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
                 new FieldInfo("id", "3", new LongFormatInfo()))));
-        return new FullOuterJoinRelationShip(inputIds, outputIds, conditionMap);
+        return new FullOuterJoinRelation(inputIds, outputIds, conditionMap);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index cb8e6282a..19feacd00 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -74,9 +74,9 @@ public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
         return new HbaseLoadNode("2", "test_hbase",
                 Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
                         new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("age", new LongFormatInfo()),
                                 new FieldInfo("cf:age", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "mytable",
                 "default",
                 "localhost:2181", "MD5(`name`)", null, null, null, null);
@@ -89,10 +89,10 @@ public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index f59a5d397..565d32633 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -36,8 +36,8 @@ import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -75,14 +75,14 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("salary", new StringFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
 
@@ -108,14 +108,14 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo()))
                 );
 
@@ -144,10 +144,10 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     @Test
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
similarity index 87%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
index 7fae01782..e744348a3 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -61,9 +61,9 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
- * Test for {@link InnerJoinNodeRelationShip}
+ * Test for {@link InnerJoinNodeRelation}
  */
-public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
+public class InnerJoinRelationSqlParseTest extends AbstractTestBase {
 
     /**
      * Build the first kafka extract node
@@ -118,16 +118,16 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                                 new FieldInfo("salary", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -147,16 +147,16 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                                 new FieldInfo("salary", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -178,15 +178,15 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo()))
         ), null, null);
     }
@@ -212,17 +212,17 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                         new FieldInfo("salary", new TimestampFormatInfo()))
         ), filters, null,
                 Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -234,12 +234,12 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A NodeRelationShip
+     * @return A NodeRelation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
@@ -247,9 +247,9 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A InnerJoinNodeRelationShip
+     * @return A InnerJoinNodeRelation
      */
-    private NodeRelationShip buildInnerJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildInnerJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
         Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -259,7 +259,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
         conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
                 new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
                 new FieldInfo("id", "3", new LongFormatInfo()))));
-        return new InnerJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+        return new InnerJoinNodeRelation(inputIds, outputIds, conditionMap);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index 5758372fc..32b3dd915 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -40,7 +40,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -49,8 +49,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,7 +62,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
- * Test for {@link LeftOuterJoinNodeRelationShip}
+ * Test for {@link LeftOuterJoinNodeRelation}
  */
 public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
 
@@ -119,16 +119,16 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                                 new FieldInfo("salary", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -150,15 +150,15 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo()))
         ), null, null);
     }
@@ -184,17 +184,17 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                         new FieldInfo("salary", new TimestampFormatInfo()))
         ), filters, FilterStrategy.RETAIN,
                 Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -206,12 +206,12 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A NodeRelationShip
+     * @return A NodeRelation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
@@ -219,9 +219,9 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A LeftOuterJoinNodeRelationShip
+     * @return A LeftOuterJoinNodeRelation
      */
-    private NodeRelationShip buildLeftOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildLeftOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
         Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -231,7 +231,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
         conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
                 new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
                 new FieldInfo("id", "3", new LongFormatInfo()))));
-        return new LeftOuterJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+        return new LeftOuterJoinNodeRelation(inputIds, outputIds, conditionMap);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 252ca4f0e..045fba0c7 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -42,8 +42,8 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -119,36 +119,36 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
                         new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("database", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("database", new TimestampFormatInfo()),
                                 new FieldInfo("database", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("table", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("table", new TimestampFormatInfo()),
                                 new FieldInfo("table", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("pk_names", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("pk_names", new TimestampFormatInfo()),
                                 new FieldInfo("pk_names", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("event_time", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("event_time", new TimestampFormatInfo()),
                                 new FieldInfo("event_time", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("event_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("event_type", new TimestampFormatInfo()),
                                 new FieldInfo("event_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("isddl", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("isddl", new TimestampFormatInfo()),
                                 new FieldInfo("isddl", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("batch_id", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("batch_id", new TimestampFormatInfo()),
                                 new FieldInfo("batch_id", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("mysql_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("mysql_type", new TimestampFormatInfo()),
                                 new FieldInfo("mysql_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("sql_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("sql_type", new TimestampFormatInfo()),
                                 new FieldInfo("sql_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("meta_ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("meta_ts", new TimestampFormatInfo()),
                                 new FieldInfo("meta_ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("up_before", new TimestampFormatInfo()),
                                 new FieldInfo("up_before", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
@@ -219,36 +219,36 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
                         new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("database", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("database", new TimestampFormatInfo()),
                                 new FieldInfo("database", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("table", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("table", new TimestampFormatInfo()),
                                 new FieldInfo("table", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("pk_names", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("pk_names", new TimestampFormatInfo()),
                                 new FieldInfo("pk_names", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("event_time", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("event_time", new TimestampFormatInfo()),
                                 new FieldInfo("event_time", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("event_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("event_type", new TimestampFormatInfo()),
                                 new FieldInfo("event_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("isddl", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("isddl", new TimestampFormatInfo()),
                                 new FieldInfo("isddl", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("batch_id", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("batch_id", new TimestampFormatInfo()),
                                 new FieldInfo("batch_id", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("mysql_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("mysql_type", new TimestampFormatInfo()),
                                 new FieldInfo("mysql_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("sql_type", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("sql_type", new TimestampFormatInfo()),
                                 new FieldInfo("sql_type", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("meta_ts", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("meta_ts", new TimestampFormatInfo()),
                                 new FieldInfo("meta_ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("up_before", new TimestampFormatInfo()),
                                 new FieldInfo("up_before", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("4", "kafka_output2", fields, relations, null,
@@ -257,10 +257,10 @@ public class MetaFieldSyncTest extends AbstractTestBase {
                 null, "id");
     }
 
-    public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
index da114700c..7ed02fcd7 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -60,10 +60,10 @@ public class MongoExtractFlinkSqlParseTest extends AbstractTestBase {
     private KafkaLoadNode buildAllMigrateKafkaNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("_id", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("_id", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("_id", new StringFormatInfo()),
                                 new FieldInfo("_id", new StringFormatInfo())));
         CsvFormat csvFormat = new CsvFormat();
         csvFormat.setDisableQuoteCharacter(true);
@@ -73,10 +73,10 @@ public class MongoExtractFlinkSqlParseTest extends AbstractTestBase {
                 null, "_id");
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 20f944cb9..f180b68e4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -67,12 +67,12 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo())
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo()))
                 );
         return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
@@ -87,10 +87,10 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index c5106f6fb..f18eafd4b 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -34,8 +34,8 @@ import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -70,12 +70,12 @@ public class OracleExtractSqlParseTest extends AbstractTestBase {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo())
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("ID", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("ID", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("NAME", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("NAME", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("AGE", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("AGE", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo()))
                 );
         return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
@@ -91,10 +91,10 @@ public class OracleExtractSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index b7553c183..d6beb8023 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -67,12 +67,12 @@ public class OracleLoadSqlParseTest extends AbstractTestBase {
                 new FieldInfo("NAME", new StringFormatInfo()),
                 new FieldInfo("AGE", new IntFormatInfo())
         );
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("ID", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("NAME", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("AGE", new IntFormatInfo()))
                 );
         return new OracleLoadNode("2", "oracle_output", fields, relations, null,
@@ -87,10 +87,10 @@ public class OracleLoadSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
index 3dec4f593..25bb170e2 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -70,9 +70,9 @@ public class PostgresExtractFlinkSqlParseTest extends AbstractTestBase {
         return new HbaseLoadNode("2", "hbase_output",
                 Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
                         new StringFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("age", new LongFormatInfo()),
                                 new FieldInfo("cf:age", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "user",
                 "default",
                 "localhost:2181", "MD5(`name`)", null, "/hbase", null, null);
@@ -85,10 +85,10 @@ public class PostgresExtractFlinkSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index 49317f3f1..3dc17c7f2 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -73,9 +73,9 @@ public class PostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
     private PostgresLoadNode buildPostgresLoadNode() {
         return new PostgresLoadNode("2", "postgres_output", Arrays.asList(new FieldInfo("name",
                 new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo()))), null, null, 1, null,
                 "jdbc:postgresql://localhost:5432/postgres",
                 "postgres",
@@ -91,10 +91,10 @@ public class PostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index d28e788d0..8b3a27e22 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -34,8 +34,8 @@ import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -49,10 +49,10 @@ public class PulsarSqlParserTest {
     private KafkaLoadNode buildKafkaLoadNode() {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())));
         return new KafkaLoadNode("1", "kafka_output", fields, relations, null, null,
                 "workerJson", "localhost:9092",
@@ -77,10 +77,10 @@ public class PulsarSqlParserTest {
                 null);
     }
 
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     @Test
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index 51af295fd..d94387a86 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
 import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -61,7 +61,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
- * Test for {@link RightOuterJoinNodeRelationShip}
+ * Test for {@link RightOuterJoinNodeRelation}
  */
 public class RightOuterJoinSqlParseTest extends AbstractTestBase {
 
@@ -118,16 +118,16 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("age", new IntFormatInfo()),
                 new FieldInfo("salary", new FloatFormatInfo()),
                 new FieldInfo("ts", new TimestampFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                                 new FieldInfo("ts", new TimestampFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                        new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                                 new FieldInfo("salary", new TimestampFormatInfo()))
                 );
         return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -149,15 +149,15 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo()))
         ), null, null);
     }
@@ -183,17 +183,17 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                         new FieldInfo("salary", new FloatFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())
                 ), Arrays.asList(
-                new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+                new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
                         new FieldInfo("id", new LongFormatInfo())),
-                new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+                new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
                         new FieldInfo("name", new StringFormatInfo())),
-                new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+                new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
                         new FieldInfo("age", new IntFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
                         new FieldInfo("ts", new TimestampFormatInfo())),
-                new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+                new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
                         new FieldInfo("salary", new TimestampFormatInfo()))
         ), filters, null,
                 Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -205,12 +205,12 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A NodeRelationShip
+     * @return A NodeRelation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
@@ -218,9 +218,9 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
      *
      * @param inputs  The inputs that is the id list of input nodes
      * @param outputs The outputs that is the id list of output nodes
-     * @return A RightOuterJoinNodeRelationShip
+     * @return A RightOuterJoinNodeRelation
      */
-    private NodeRelationShip buildRightOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildRightOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
         Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -230,7 +230,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
         conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
                 new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
                 new FieldInfo("id", "3", new LongFormatInfo()))));
-        return new RightOuterJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+        return new RightOuterJoinNodeRelation(inputIds, outputIds, conditionMap);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index 43b5a2e50..e0e86a4f6 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -83,10 +83,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
     private KafkaLoadNode buildKafkaNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("val_char", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("val_char", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("val_char", new StringFormatInfo()),
                                 new FieldInfo("val_char", new StringFormatInfo()))
                 );
         return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
@@ -101,10 +101,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
     private SqlServerLoadNode buildSqlServerLoadNode(String id) {
         List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
                 new FieldInfo("name", new StringFormatInfo()));
-        List<FieldRelationShip> relations = Arrays
-                .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
                                 new FieldInfo("id", new LongFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo()))
                 );
         return new SqlServerLoadNode(id, "sqlserver_out", fields, relations, null, null, 1,
@@ -115,10 +115,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
     /**
      * Build node relation.
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
index 22033f8f5..8622c1b47 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -73,9 +73,9 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
     private TDSQLPostgresLoadNode buildTDSQLPostgresLoadNode() {
         return new TDSQLPostgresLoadNode("2", "tdsqlPostgres_output", Arrays.asList(new FieldInfo("name",
                 new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())),
-                Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+                Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
                                 new FieldInfo("name", new StringFormatInfo())),
-                        new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
                                 new FieldInfo("age", new IntFormatInfo()))), null, null, 1, null,
                 "jdbc:postgresql://localhost:5432/tdsql",
                 "tdsqlpostgres",
@@ -91,10 +91,10 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
      * @param outputs load node
      * @return node relation
      */
-    private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
         List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
         List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
-        return new NodeRelationShip(inputIds, outputIds);
+        return new NodeRelation(inputIds, outputIds);
     }
 
     /**