You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/30 08:55:09 UTC
[incubator-inlong] branch master updated: [INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0d68ac524 [INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)
0d68ac524 is described below
commit 0d68ac524578285d044abec0f9c7dd863b20288f
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon May 30 16:55:03 2022 +0800
[INLONG-4428][Sort][Manager] Optimize the name for Data Node related modules and classes (#4432)
---
.../service/sort/CreateSortConfigListenerV2.java | 14 +-
.../sort/CreateStreamSortConfigListener.java | 16 +-
...ationShipUtils.java => FieldRelationUtils.java} | 96 +++---
.../manager/service/sort/util/LoadNodeUtils.java | 16 +-
.../service/sort/util/NodeRelationShipUtils.java | 30 +-
.../service/sort/util/TransformNodeUtils.java | 6 +-
.../inlong/sort/protocol/BuiltInFieldInfo.java | 1 +
.../org/apache/inlong/sort/protocol/FieldInfo.java | 36 +--
.../apache/inlong/sort/protocol/MetaFieldInfo.java | 127 ++++++++
.../apache/inlong/sort/protocol/StreamInfo.java | 8 +-
.../inlong/sort/protocol/node/ExtractNode.java | 8 +-
.../apache/inlong/sort/protocol/node/LoadNode.java | 30 +-
.../protocol/node/load/ClickHouseLoadNode.java | 6 +-
.../protocol/node/load/FileSystemLoadNode.java | 6 +-
.../sort/protocol/node/load/HbaseLoadNode.java | 6 +-
.../sort/protocol/node/load/HiveLoadNode.java | 6 +-
.../sort/protocol/node/load/IcebergLoadNode.java | 6 +-
.../sort/protocol/node/load/KafkaLoadNode.java | 6 +-
.../sort/protocol/node/load/MySqlLoadNode.java | 6 +-
.../sort/protocol/node/load/OracleLoadNode.java | 6 +-
.../sort/protocol/node/load/PostgresLoadNode.java | 6 +-
.../sort/protocol/node/load/SqlServerLoadNode.java | 6 +-
.../protocol/node/load/TDSQLPostgresLoadNode.java | 6 +-
.../sort/protocol/node/transform/DistinctNode.java | 8 +-
.../protocol/node/transform/TransformNode.java | 14 +-
.../{FieldRelationShip.java => FieldRelation.java} | 11 +-
...elationShip.java => FullOuterJoinRelation.java} | 12 +-
...elationShip.java => InnerJoinNodeRelation.java} | 12 +-
.../{JoinRelationShip.java => JoinRelation.java} | 22 +-
...ionShip.java => LeftOuterJoinNodeRelation.java} | 9 +-
.../{NodeRelationShip.java => NodeRelation.java} | 22 +-
...onShip.java => RightOuterJoinNodeRelation.java} | 17 +-
...odeRelationShip.java => UnionNodeRelation.java} | 10 +-
.../apache/inlong/sort/protocol/FieldInfoTest.java | 2 +-
.../apache/inlong/sort/protocol/GroupInfoTest.java | 18 +-
...odeRelationTest.java => MetaFieldInfoTest.java} | 12 +-
.../inlong/sort/protocol/StreamInfoTest.java | 28 +-
.../protocol/node/load/ClickHouseLoadNodeTest.java | 4 +-
.../sort/protocol/node/load/HbaseLoadNodeTest.java | 4 +-
.../sort/protocol/node/load/HiveLoadNodeTest.java | 4 +-
.../sort/protocol/node/load/KafkaLoadNodeTest.java | 4 +-
.../sort/protocol/node/load/MySqlLoadNodeTest.java | 4 +-
.../protocol/node/load/OracleLoadNodeTest.java | 4 +-
.../protocol/node/load/PostgresLoadNodeTest.java | 4 +-
.../protocol/node/load/SqlServerLoadNodeTest.java | 8 +-
.../node/load/TDSQLPostgresLoadNodeTest.java | 4 +-
.../protocol/node/transform/DistinctNodeTest.java | 10 +-
...elationShipTest.java => FieldRelationTest.java} | 8 +-
.../relation/FullOuterJoinNodeRelationTest.java | 8 +-
.../relation/InnerJoinNodeRelationTest.java | 8 +-
.../relation/LeftOuterJoinNodeRelationTest.java | 8 +-
.../transformation/relation/NodeRelationTest.java | 8 +-
.../relation/RightOuterJoinNodeRelationTest.java | 8 +-
.../relation/UnionNodeRelationTest.java | 8 +-
.../inlong/sort/parser/impl/FlinkSqlParser.java | 324 ++++++++++++++++-----
.../apache/inlong/sort/parser/AllMigrateTest.java | 12 +-
.../sort/parser/ClickHouseSqlParserTest.java | 14 +-
.../sort/parser/DistinctNodeSqlParseTest.java | 62 ++--
.../apache/inlong/sort/parser/FilterParseTest.java | 18 +-
.../inlong/sort/parser/FlinkSqlParserTest.java | 38 +--
.../sort/parser/FullOuterJoinSqlParseTest.java | 54 ++--
.../sort/parser/HbaseLoadFlinkSqlParseTest.java | 12 +-
.../sort/parser/IcebergNodeSqlParserTest.java | 28 +-
...est.java => InnerJoinRelationSqlParseTest.java} | 68 ++---
.../sort/parser/LeftOuterJoinSqlParseTest.java | 54 ++--
.../inlong/sort/parser/MetaFieldSyncTest.java | 72 ++---
.../sort/parser/MongoExtractFlinkSqlParseTest.java | 14 +-
.../inlong/sort/parser/MySqlLoadSqlParseTest.java | 16 +-
.../sort/parser/OracleExtractSqlParseTest.java | 16 +-
.../inlong/sort/parser/OracleLoadSqlParseTest.java | 16 +-
.../parser/PostgresExtractFlinkSqlParseTest.java | 12 +-
.../parser/PostgresLoadNodeFlinkSqlParseTest.java | 12 +-
.../inlong/sort/parser/PulsarSqlParserTest.java | 14 +-
.../sort/parser/RightOuterJoinSqlParseTest.java | 54 ++--
.../sort/parser/SqlServerNodeSqlParseTest.java | 20 +-
.../TDSQLPostgresLoadNodeFlinkSqlParseTest.java | 12 +-
76 files changed, 1003 insertions(+), 696 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 1a00a22f8..1a9032088 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -115,7 +115,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
createNodesForStream(
sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())),
- createNodeRelationshipsForStream(
+ createNodeRelationsForStream(
sourceResponseMap.get(inlongStreamInfo.getInlongStreamId()),
sinkResponseMap.get(inlongStreamInfo.getInlongStreamId())))
).collect(Collectors.toList());
@@ -168,17 +168,17 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
return nodes;
}
- private List<NodeRelationShip> createNodeRelationshipsForStream(
+ private List<NodeRelation> createNodeRelationsForStream(
List<SourceResponse> sourceResponses,
List<SinkResponse> sinkResponses) {
- NodeRelationShip relationship = new NodeRelationShip();
+ NodeRelation relation = new NodeRelation();
List<String> inputs = sourceResponses.stream().map(SourceResponse::getSourceName)
.collect(Collectors.toList());
List<String> outputs = sinkResponses.stream().map(SinkResponse::getSinkName)
.collect(Collectors.toList());
- relationship.setInputs(inputs);
- relationship.setOutputs(outputs);
- return Lists.newArrayList(relationship);
+ relation.setInputs(inputs);
+ relation.setOutputs(outputs);
+ return Lists.newArrayList(relation);
}
private void upsertDataFlow(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index bf03da23a..55f2b0ad0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -48,7 +48,7 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -95,8 +95,8 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
try {
List<SourceResponse> sourceResponses = createPulsarSources(groupInfo, streamInfo);
List<Node> nodes = createNodesForStream(sourceResponses, sinkResponses);
- List<NodeRelationShip> nodeRelationships = createNodeRelationshipsForStream(sourceResponses, sinkResponses);
- StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelationships);
+ List<NodeRelation> nodeRelations = createNodeRelationsForStream(sourceResponses, sinkResponses);
+ StreamInfo sortStreamInfo = new StreamInfo(streamId, nodes, nodeRelations);
GroupInfo sortGroupInfo = new GroupInfo(groupId, Lists.newArrayList(sortStreamInfo));
String dataFlows = OBJECT_MAPPER.writeValueAsString(sortGroupInfo);
InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
@@ -157,17 +157,17 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
return nodes;
}
- private List<NodeRelationShip> createNodeRelationshipsForStream(
+ private List<NodeRelation> createNodeRelationsForStream(
List<SourceResponse> sourceResponses,
List<SinkResponse> sinkResponses) {
- NodeRelationShip relationship = new NodeRelationShip();
+ NodeRelation relation = new NodeRelation();
List<String> inputs = sourceResponses.stream().map(SourceResponse::getSourceName)
.collect(Collectors.toList());
List<String> outputs = sinkResponses.stream().map(SinkResponse::getSinkName)
.collect(Collectors.toList());
- relationship.setInputs(inputs);
- relationship.setOutputs(outputs);
- return Lists.newArrayList(relationship);
+ relation.setInputs(inputs);
+ relation.setOutputs(outputs);
+ return Lists.newArrayList(relation);
}
private void upsertDataFlow(InlongStreamInfo streamInfo, InlongStreamExtInfo extInfo, String keyName) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
similarity index 72%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 46216cc62..301914e73 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -36,7 +36,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -49,14 +49,14 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * Util for creat field relationship.
+ * Util for creat field relation.
*/
-public class FieldRelationShipUtils {
+public class FieldRelationUtils {
/**
- * Create relationship of fields.
+ * Create relation of fields.
*/
- public static List<FieldRelationShip> createFieldRelationShips(TransformResponse transformResponse) {
+ public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse) {
TransformType transformType = TransformType.forType(transformResponse.getTransformType());
TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
transformResponse.getTransformDefinition(), transformType);
@@ -66,15 +66,15 @@ public class FieldRelationShipUtils {
switch (transformType) {
case SPLITTER:
SplitterDefinition splitterDefinition = (SplitterDefinition) transformDefinition;
- return createSplitterFieldRelationShips(fieldList, transformName, splitterDefinition, preNodes);
+ return createSplitterFieldRelations(fieldList, transformName, splitterDefinition, preNodes);
case STRING_REPLACER:
StringReplacerDefinition replacerDefinition = (StringReplacerDefinition) transformDefinition;
- return createReplacerFieldRelationShips(fieldList, transformName, replacerDefinition, preNodes);
+ return createReplacerFieldRelations(fieldList, transformName, replacerDefinition, preNodes);
case DE_DUPLICATION:
case FILTER:
- return createFieldRelationShips(fieldList, transformName);
+ return createFieldRelations(fieldList, transformName);
case JOINER:
- return createJoinerFieldRelationShips(fieldList, transformName);
+ return createJoinerFieldRelations(fieldList, transformName);
default:
throw new UnsupportedOperationException(
String.format("Unsupported transformType=%s for Inlong", transformType));
@@ -82,9 +82,9 @@ public class FieldRelationShipUtils {
}
/**
- * Create relationship of fields.
+ * Create relation of fields.
*/
- private static List<FieldRelationShip> createFieldRelationShips(List<StreamField> fieldList, String transformName) {
+ private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName) {
return fieldList.stream()
.map(FieldInfoUtils::parseStreamField)
.map(fieldInfo -> {
@@ -92,15 +92,15 @@ public class FieldRelationShipUtils {
fieldInfo.getFormatInfo());
FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
fieldInfo.getFormatInfo());
- return new FieldRelationShip(inputField, outputField);
+ return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
/**
- * Create relationship of fields in join function.
+ * Create relation of fields in join function.
*/
- private static List<FieldRelationShip> createJoinerFieldRelationShips(List<StreamField> fieldList,
- String transformName) {
+ private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList,
+ String transformName) {
return fieldList.stream()
.map(streamField -> {
FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
@@ -109,20 +109,21 @@ public class FieldRelationShipUtils {
streamField.getOriginNodeName(), formatInfo);
FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
transformName, formatInfo);
- return new FieldRelationShip(inputField, outputField);
+ return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
/**
- * Create relationship of fields in split function.
+ * Create relation of fields in split function.
*/
- private static List<FieldRelationShip> createSplitterFieldRelationShips(List<StreamField> fieldList,
- String transformName, SplitterDefinition splitterDefinition, String preNodes) {
+ private static List<FieldRelation> createSplitterFieldRelations(
+ List<StreamField> fieldList, String transformName,
+ SplitterDefinition splitterDefinition, String preNodes) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
String preNode = preNodes.split(",")[0];
List<SplitRule> splitRules = splitterDefinition.getSplitRules();
Set<String> splitFields = Sets.newHashSet();
- List<FieldRelationShip> fieldRelationships = splitRules.stream()
+ List<FieldRelation> fieldRelations = splitRules.stream()
.map(splitRule -> parseSplitRule(splitRule, splitFields, transformName, preNode))
.reduce(Lists.newArrayList(), (list1, list2) -> {
list1.addAll(list2);
@@ -131,57 +132,58 @@ public class FieldRelationShipUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelationships.addAll(createFieldRelationShips(filteredFieldList, transformName));
- return fieldRelationships;
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+ return fieldRelations;
}
/**
- * Create relationship of fields in replace function.
+ * Create relation of fields in replace function.
*/
- private static List<FieldRelationShip> createReplacerFieldRelationShips(List<StreamField> fieldList,
- String transformName, StringReplacerDefinition replacerDefinition, String preNodes) {
+ private static List<FieldRelation> createReplacerFieldRelations(
+ List<StreamField> fieldList, String transformName,
+ StringReplacerDefinition replacerDefinition, String preNodes) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
String preNode = preNodes.split(",")[0];
List<ReplaceRule> replaceRules = replacerDefinition.getReplaceRules();
Set<String> replaceFields = Sets.newHashSet();
- List<FieldRelationShip> fieldRelationships = replaceRules.stream()
+ List<FieldRelation> fieldRelations = replaceRules.stream()
.map(replaceRule -> parseReplaceRule(replaceRule, replaceFields, transformName, preNode))
.collect(Collectors.toList());
- fieldRelationships = cascadeFunctionRelationships(fieldRelationships);
+ fieldRelations = cascadeFunctionRelations(fieldRelations);
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelationships.addAll(createFieldRelationShips(filteredFieldList, transformName));
- return fieldRelationships;
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+ return fieldRelations;
}
/**
- * Create relationship of fields in cascade function.
+ * Create relation of fields in cascade function.
*/
- private static List<FieldRelationShip> cascadeFunctionRelationships(List<FieldRelationShip> fieldRelationships) {
+ private static List<FieldRelation> cascadeFunctionRelations(List<FieldRelation> fieldRelations) {
Map<String, List<CascadeFunction>> cascadeFunctions = Maps.newHashMap();
Map<String, FieldInfo> targetFields = Maps.newHashMap();
- for (FieldRelationShip fieldRelationship : fieldRelationships) {
- CascadeFunction cascadeFunction = (CascadeFunction) fieldRelationship.getInputField();
- String targetField = fieldRelationship.getOutputField().getName();
+ for (FieldRelation fieldRelation : fieldRelations) {
+ CascadeFunction cascadeFunction = (CascadeFunction) fieldRelation.getInputField();
+ String targetField = fieldRelation.getOutputField().getName();
cascadeFunctions.computeIfAbsent(targetField, k -> Lists.newArrayList()).add(cascadeFunction);
- targetFields.put(targetField, fieldRelationship.getOutputField());
+ targetFields.put(targetField, fieldRelation.getOutputField());
}
- List<FieldRelationShip> cascadeRelationships = Lists.newArrayList();
+ List<FieldRelation> cascadeRelations = Lists.newArrayList();
for (Map.Entry<String, List<CascadeFunction>> entry : cascadeFunctions.entrySet()) {
String targetField = entry.getKey();
CascadeFunctionWrapper functionWrapper = new CascadeFunctionWrapper(entry.getValue());
FieldInfo targetFieldInfo = targetFields.get(targetField);
- cascadeRelationships.add(new FieldRelationShip(functionWrapper, targetFieldInfo));
+ cascadeRelations.add(new FieldRelation(functionWrapper, targetFieldInfo));
}
- return cascadeRelationships;
+ return cascadeRelations;
}
/**
* Parse rule of replacer.
*/
- private static FieldRelationShip parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
- String transformName, String preNode) {
+ private static FieldRelation parseReplaceRule(ReplaceRule replaceRule, Set<String> replaceFields,
+ String transformName, String preNode) {
StreamField sourceField = replaceRule.getSourceField();
final String fieldName = sourceField.getFieldName();
String regex = replaceRule.getRegex();
@@ -195,25 +197,25 @@ public class FieldRelationShipUtils {
if (replaceMode == ReplaceMode.RELACE_ALL) {
RegexpReplaceFunction regexpReplaceFunction = new RegexpReplaceFunction(fieldInfo,
new StringConstantParam(regex), new StringConstantParam(targetValue));
- return new FieldRelationShip(regexpReplaceFunction, targetFieldInfo);
+ return new FieldRelation(regexpReplaceFunction, targetFieldInfo);
} else {
RegexpReplaceFirstFunction regexpReplaceFirstFunction = new RegexpReplaceFirstFunction(fieldInfo,
new StringConstantParam(regex), new StringConstantParam(targetValue));
- return new FieldRelationShip(regexpReplaceFirstFunction, targetFieldInfo);
+ return new FieldRelation(regexpReplaceFirstFunction, targetFieldInfo);
}
}
/**
* Parse rule of split.
*/
- private static List<FieldRelationShip> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
- String transformName, String preNode) {
+ private static List<FieldRelation> parseSplitRule(SplitRule splitRule, Set<String> splitFields,
+ String transformName, String preNode) {
StreamField sourceField = splitRule.getSourceField();
FieldInfo fieldInfo = FieldInfoUtils.parseStreamField(sourceField);
fieldInfo.setNodeId(preNode);
String seperator = splitRule.getSeperator();
List<String> targetSources = splitRule.getTargetFields();
- List<FieldRelationShip> splitRelationships = Lists.newArrayList();
+ List<FieldRelation> splitRelations = Lists.newArrayList();
for (int index = 0; index < targetSources.size(); index++) {
SplitIndexFunction splitIndexFunction = new SplitIndexFunction(
fieldInfo, new StringConstantParam(seperator), new ConstantParam(index));
@@ -221,8 +223,8 @@ public class FieldRelationShipUtils {
targetSources.get(index), transformName, FieldInfoUtils.convertFieldFormat(FieldType.STRING.name())
);
splitFields.add(targetSources.get(index));
- splitRelationships.add(new FieldRelationShip(splitIndexFunction, targetFieldInfo));
+ splitRelations.add(new FieldRelation(splitIndexFunction, targetFieldInfo));
}
- return splitRelationships;
+ return splitRelations;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index cd677e911..1b6ca5fde 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -42,7 +42,7 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.List;
import java.util.Map;
@@ -97,7 +97,7 @@ public class LoadNodeUtils {
List<FieldInfo> fieldInfos = fieldList.stream()
.map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
.collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
Integer sinkParallelism = null;
@@ -154,7 +154,7 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
Map<String, String> properties = hiveSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
List<FieldInfo> partitionFields = Lists.newArrayList();
@@ -193,7 +193,7 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
Map<String, String> properties = sinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
return new HbaseLoadNode(
@@ -225,7 +225,7 @@ public class LoadNodeUtils {
List<FieldInfo> fields = fieldList.stream()
.map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
.collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationships = parseSinkFields(fieldList, name);
+ List<FieldRelation> fieldRelationships = parseSinkFields(fieldList, name);
return new PostgresLoadNode(postgresSinkResponse.getSinkName(),
postgresSinkResponse.getSinkName(),
fields, fieldRelationships, null, null, 1,
@@ -245,7 +245,7 @@ public class LoadNodeUtils {
.map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse,
name))
.collect(Collectors.toList());
- List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
+ List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
return new ClickHouseLoadNode(clickHouseSinkResponse.getSinkName(),
clickHouseSinkResponse.getSinkName(),
fields, fieldRelationShips, null, null, 1,
@@ -258,7 +258,7 @@ public class LoadNodeUtils {
/**
* Parse information field of data sink.
*/
- public static List<FieldRelationShip> parseSinkFields(List<SinkField> fieldList, String sinkName) {
+ public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName) {
if (CollectionUtils.isEmpty(fieldList)) {
return Lists.newArrayList();
}
@@ -274,7 +274,7 @@ public class LoadNodeUtils {
String sourceFieldType = field.getSourceFieldType();
FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
FieldInfoUtils.convertFieldFormat(sourceFieldType));
- return new FieldRelationShip(sourceField, sinkField);
+ return new FieldRelation(sourceField, sinkField);
}).collect(Collectors.toList());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index fb8078384..b9791ea64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -45,10 +45,10 @@ import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
import java.util.Iterator;
import java.util.List;
@@ -64,7 +64,7 @@ public class NodeRelationShipUtils {
/**
* Create node relationship for the given stream
*/
- public static List<NodeRelationShip> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
+ public static List<NodeRelation> createNodeRelationShipsForStream(InlongStreamInfo streamInfo) {
String tempView = streamInfo.getExtParams();
if (StringUtils.isEmpty(tempView)) {
log.warn("StreamNodeRelationShip is empty for Stream={}", streamInfo);
@@ -73,7 +73,7 @@ public class NodeRelationShipUtils {
StreamPipeline pipeline = StreamParseUtils.parseStreamPipeline(streamInfo.getExtParams(),
streamInfo.getInlongStreamId());
return pipeline.getPipeline().stream()
- .map(nodeRelationship -> new NodeRelationShip(
+ .map(nodeRelationship -> new NodeRelation(
Lists.newArrayList(nodeRelationship.getInputNodes()),
Lists.newArrayList(nodeRelationship.getOutputNodes())))
.collect(Collectors.toList());
@@ -100,11 +100,11 @@ public class NodeRelationShipUtils {
return transformDefinition.getTransformType() == TransformType.JOINER;
}).collect(Collectors.toMap(TransformNode::getName, transformNode -> transformNode));
- List<NodeRelationShip> relationships = streamInfo.getRelations();
- Iterator<NodeRelationShip> shipIterator = relationships.listIterator();
- List<NodeRelationShip> joinRelationships = Lists.newArrayList();
+ List<NodeRelation> relationships = streamInfo.getRelations();
+ Iterator<NodeRelation> shipIterator = relationships.listIterator();
+ List<NodeRelation> joinRelationships = Lists.newArrayList();
while (shipIterator.hasNext()) {
- NodeRelationShip relationship = shipIterator.next();
+ NodeRelation relationship = shipIterator.next();
List<String> outputs = relationship.getOutputs();
if (outputs.size() == 1) {
String nodeName = outputs.get(0);
@@ -119,8 +119,8 @@ public class NodeRelationShipUtils {
relationships.addAll(joinRelationships);
}
- private static NodeRelationShip createNodeRelationShip(JoinerDefinition joinerDefinition,
- NodeRelationShip nodeRelationship) {
+ private static NodeRelation createNodeRelationShip(JoinerDefinition joinerDefinition,
+ NodeRelation nodeRelationship) {
JoinMode joinMode = joinerDefinition.getJoinMode();
String leftNode = getNodeName(joinerDefinition.getLeftNode());
String rightNode = getNodeName(joinerDefinition.getRightNode());
@@ -143,11 +143,11 @@ public class NodeRelationShipUtils {
joinConditions.put(rightNode, filterFunctions);
switch (joinMode) {
case LEFT_JOIN:
- return new LeftOuterJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new LeftOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
case INNER_JOIN:
- return new InnerJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new InnerJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
case RIGHT_JOIN:
- return new RightOuterJoinNodeRelationShip(preNodes, nodeRelationship.getOutputs(), joinConditions);
+ return new RightOuterJoinNodeRelation(preNodes, nodeRelationship.getOutputs(), joinConditions);
default:
throw new IllegalArgumentException(String.format("Unsupported join mode=%s for inlong", joinMode));
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 12d211996..4f480ac32 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -63,7 +63,7 @@ public class TransformNodeUtils {
* Create distinct node based on deDuplicationDefinition
*/
public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
- TransformResponse transformResponse) {
+ TransformResponse transformResponse) {
List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
List<FieldInfo> distinctFields = streamFields.stream()
.map(FieldInfoUtils::parseStreamField)
@@ -87,7 +87,7 @@ public class TransformNodeUtils {
return new DistinctNode(transformNode.getId(),
transformNode.getName(),
transformNode.getFields(),
- transformNode.getFieldRelationShips(),
+ transformNode.getFieldRelations(),
transformNode.getFilters(),
transformNode.getFilterStrategy(),
distinctFields,
@@ -106,7 +106,7 @@ public class TransformNodeUtils {
List<FieldInfo> fieldInfos = transformResponse.getFieldList().stream()
.map(FieldInfoUtils::parseStreamField).collect(Collectors.toList());
transformNode.setFields(fieldInfos);
- transformNode.setFieldRelationShips(FieldRelationShipUtils.createFieldRelationShips(transformResponse));
+ transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse));
transformNode.setFilters(
FilterFunctionUtils.createFilterFunctions(transformResponse));
transformNode.setFilterStrategy(FilterFunctionUtils.parseFilterStrategy(transformResponse));
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index 56d728a15..d1506ad54 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -24,6 +24,7 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
/**
* built-in field info.
*/
+@Deprecated
public class BuiltInFieldInfo extends FieldInfo {
private static final long serialVersionUID = -3436204467879205139L;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
index 3f2e1e0d3..453a049b0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/FieldInfo.java
@@ -30,8 +30,8 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.Objects;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
@@ -39,6 +39,7 @@ import java.util.Objects;
property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = FieldInfo.class, name = "base"),
+ @JsonSubTypes.Type(value = MetaFieldInfo.class, name = "metaField"),
@JsonSubTypes.Type(value = BuiltInFieldInfo.class, name = "builtin")
})
@Data
@@ -52,24 +53,31 @@ public class FieldInfo implements FunctionParam, Serializable {
private String nodeId;
@JsonIgnore
private String tableNameAlias;
+ /**
+ * It will be null if the field is a meta field
+ */
+ @Nullable
@JsonProperty("formatInfo")
private FormatInfo formatInfo;
public FieldInfo(
@JsonProperty("name") String name,
@JsonProperty("formatInfo") FormatInfo formatInfo) {
- this.name = Preconditions.checkNotNull(name);
- this.formatInfo = Preconditions.checkNotNull(formatInfo);
+ this(name, null, formatInfo);
+ }
+
+ public FieldInfo(@JsonProperty("name") String name) {
+ this(name, null, null);
}
@JsonCreator
public FieldInfo(
@JsonProperty("name") String name,
@JsonProperty("nodeId") String nodeId,
- @JsonProperty("formatInfo") FormatInfo formatInfo) {
+ @Nullable @JsonProperty("formatInfo") FormatInfo formatInfo) {
this.name = Preconditions.checkNotNull(name);
this.nodeId = nodeId;
- this.formatInfo = Preconditions.checkNotNull(formatInfo);
+ this.formatInfo = formatInfo;
}
@Override
@@ -86,22 +94,4 @@ public class FieldInfo implements FunctionParam, Serializable {
}
return formatName;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- FieldInfo fieldInfo = (FieldInfo) o;
- return name.equals(fieldInfo.name) && formatInfo.equals(fieldInfo.formatInfo);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, formatInfo);
- }
-
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
new file mode 100644
index 000000000..bd3d389d6
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/MetaFieldInfo.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol;
+
+import lombok.Getter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Meta field info.
+ */
+@Getter
+public class MetaFieldInfo extends FieldInfo {
+
+ private static final long serialVersionUID = -3436204467879205139L;
+
+ @JsonProperty("metaField")
+ private final MetaField metaField;
+
+ @JsonCreator
+ public MetaFieldInfo(
+ @JsonProperty("name") String name,
+ @JsonProperty("nodeId") String nodeId,
+ @JsonProperty("metaField") MetaField metaField) {
+ super(name, nodeId, null);
+ this.metaField = metaField;
+ }
+
+ public MetaFieldInfo(
+ @JsonProperty("name") String name,
+ @JsonProperty("metaField") MetaField metaField) {
+ super(name);
+ this.metaField = metaField;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MetaFieldInfo that = (MetaFieldInfo) o;
+ return metaField == that.metaField
+ && super.equals(that);
+ }
+
+ public enum MetaField {
+
+ /**
+ * The process time of flink
+ */
+ PROCESS_TIME,
+ /**
+ * Name of the schema that contain the row, currently used for Oracle database
+ */
+ SCHEMA_NAME,
+ /**
+ * Name of the database that contain the row.
+ */
+ DATABASE_NAME,
+ /**
+ * Name of the table that contain the row.
+ */
+ TABLE_NAME,
+ /**
+ * It indicates the time that the change was made in the database.
+ * If the record is read from snapshot of the table instead of the change stream, the value is always 0
+ */
+ OP_TS,
+ /**
+ * Whether the DDL statement. Currently, it is used for MySQL database.
+ */
+ IS_DDL,
+ /**
+ * Type of database operation, such as INSERT/DELETE, etc. Currently, it is used for MySQL database.
+ */
+ OP_TYPE,
+ /**
+ * MySQL binlog data Row. Currently, it is used for MySQL database.
+ */
+ DATA,
+ /**
+ * The value of the field before update. Currently, it is used for MySQL database.
+ */
+ UPDATE_BEFORE,
+ /**
+ * Batch id of binlog. Currently, it is used for MySQL database.
+ */
+ BATCH_ID,
+ /**
+ * Mapping of sql_type table fields to java data type IDs. Currently, it is used for MySQL database.
+ */
+ SQL_TYPE,
+ /**
+ * The current time when the ROW was received and processed. Currently, it is used for MySQL database.
+ */
+ TS,
+ /**
+ * The table structure. It is only used for MySQL database
+ */
+ MYSQL_TYPE,
+ /**
+ * Primary key field name. Currently, it is used for MySQL database.
+ */
+ PK_NAMES
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
index 64bf40480..8bcdf8de6 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/StreamInfo.java
@@ -22,7 +22,7 @@ import lombok.Data;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import java.io.Serializable;
import java.util.List;
@@ -41,7 +41,7 @@ public class StreamInfo implements Serializable {
@JsonProperty("nodes")
private List<Node> nodes;
@JsonProperty("relations")
- private List<NodeRelationShip> relations;
+ private List<NodeRelation> relations;
/**
* Information of stream.
@@ -49,11 +49,11 @@ public class StreamInfo implements Serializable {
* @param streamId Uniquely identifies of GroupInfo
* @param nodes The node list that StreamInfo contains
* @param relations The relation list that StreamInfo contains,
- * it represents the relationship between nodes of StreamInfo
+ * it represents the relation between nodes of StreamInfo
*/
@JsonCreator
public StreamInfo(@JsonProperty("streamId") String streamId, @JsonProperty("nodes") List<Node> nodes,
- @JsonProperty("relations") List<NodeRelationShip> relations) {
+ @JsonProperty("relations") List<NodeRelation> relations) {
this.streamId = Preconditions.checkNotNull(streamId, "streamId is null");
this.nodes = Preconditions.checkNotNull(nodes, "nodes is null");
Preconditions.checkState(!nodes.isEmpty(), "nodes is empty");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
index 6fa212564..b17dbf535 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java
@@ -78,10 +78,10 @@ public abstract class ExtractNode implements Node {
@JsonCreator
public ExtractNode(@JsonProperty("id") String id,
- @JsonProperty("name") String name,
- @JsonProperty("fields") List<FieldInfo> fields,
- @Nullable @JsonProperty("watermark_field") WatermarkField watermarkField,
- @JsonProperty("properties") Map<String, String> properties) {
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @Nullable @JsonProperty("watermark_field") WatermarkField watermarkField,
+ @Nullable @JsonProperty("properties") Map<String, String> properties) {
this.id = Preconditions.checkNotNull(id, "id is null");
this.name = name;
this.fields = Preconditions.checkNotNull(fields, "fields is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 6fbfc24a8..283e5e5ac 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -34,11 +34,10 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nullable;
@@ -61,8 +60,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = ClickHouseLoadNode.class, name = "clickHouseLoad"),
@JsonSubTypes.Type(value = SqlServerLoadNode.class, name = "sqlserverLoad"),
@JsonSubTypes.Type(value = TDSQLPostgresLoadNode.class, name = "tdsqlPostgresLoad"),
- @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad"),
- @JsonSubTypes.Type(value = OracleLoadNode.class, name = "oracleLoad")
+ @JsonSubTypes.Type(value = MySqlLoadNode.class, name = "mysqlLoad")
})
@NoArgsConstructor
@Data
@@ -75,8 +73,8 @@ public abstract class LoadNode implements Node {
private String name;
@JsonProperty("fields")
private List<FieldInfo> fields;
- @JsonProperty("fieldRelationShips")
- private List<FieldRelationShip> fieldRelationShips;
+ @JsonProperty("fieldRelations")
+ private List<FieldRelation> fieldRelations;
@Nullable
@JsonInclude(Include.NON_NULL)
@JsonProperty("sinkParallelism")
@@ -94,20 +92,20 @@ public abstract class LoadNode implements Node {
@JsonCreator
public LoadNode(@JsonProperty("id") String id,
- @JsonProperty("name") String name,
- @JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
- @JsonProperty("filters") List<FilterFunction> filters,
- @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
- @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
- @Nullable @JsonProperty("properties") Map<String, String> properties) {
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @Nullable @JsonProperty("properties") Map<String, String> properties) {
this.id = Preconditions.checkNotNull(id, "id is null");
this.name = name;
this.fields = Preconditions.checkNotNull(fields, "fields is null");
Preconditions.checkState(!fields.isEmpty(), "fields is empty");
- this.fieldRelationShips = Preconditions.checkNotNull(fieldRelationShips,
- "fieldRelationShips is null");
- Preconditions.checkState(!fieldRelationShips.isEmpty(), "fieldRelationShips is empty");
+ this.fieldRelations = Preconditions.checkNotNull(fieldRelations,
+ "fieldRelations is null");
+ Preconditions.checkState(!fieldRelations.isEmpty(), "fieldRelations is empty");
this.filters = filters;
this.filterStrategy = filterStrategy;
this.sinkParallelism = sinkParallelism;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
index 8716f0bd8..92971a0db 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNode.java
@@ -28,7 +28,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
@@ -65,7 +65,7 @@ public class ClickHouseLoadNode extends LoadNode implements Serializable {
public ClickHouseLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -74,7 +74,7 @@ public class ClickHouseLoadNode extends LoadNode implements Serializable {
@Nonnull @JsonProperty("url") String url,
@Nonnull @JsonProperty("userName") String userName,
@Nonnull @JsonProperty("passWord") String password) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.url = Preconditions.checkNotNull(url, "url is null");
this.userName = Preconditions.checkNotNull(userName, "userName is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
index 172b80215..8839cceb3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
@@ -70,7 +70,7 @@ public class FileSystemLoadNode extends LoadNode implements Serializable {
public FileSystemLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@Nonnull @JsonProperty("path") String path,
@Nonnull @JsonProperty("format") String format,
@@ -78,7 +78,7 @@ public class FileSystemLoadNode extends LoadNode implements Serializable {
@JsonProperty("properties") Map<String, String> properties,
@JsonProperty("parFields") List<FieldInfo> partitionFields,
@JsonProperty("serverTimeZone") String serverTimeZone) {
- super(id, name, fields, fieldRelationShips, filters, null, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, null, sinkParallelism, properties);
this.format = Preconditions.checkNotNull(format, "format type is null");
this.path = Preconditions.checkNotNull(path, "path is null");
this.partitionFields = partitionFields;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
index 3841454d9..1a28cad2a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNode.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.HBaseConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -76,7 +76,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
public HbaseLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -89,7 +89,7 @@ public class HbaseLoadNode extends LoadNode implements Serializable {
@JsonProperty("zookeeperZnodeParent") String zookeeperZnodeParent,
@JsonProperty("sinkBufferFlushMaxRows") String sinkBufferFlushMaxRows,
@JsonProperty("sinkBufferFlushInterval") String sinkBufferFlushInterval) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "tableName of hbase is null");
this.nameSpace = Preconditions.checkNotNull(nameSpace, "nameSpace of hbase is null");
this.zookeeperQuorum = Preconditions.checkNotNull(zookeeperQuorum, "zookeeperQuorum of hbase is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
index 90e3be5c6..151d5370f 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
@@ -83,7 +83,7 @@ public class HiveLoadNode extends LoadNode implements Serializable {
public HiveLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -95,7 +95,7 @@ public class HiveLoadNode extends LoadNode implements Serializable {
@JsonProperty("hiveVersion") String hiveVersion,
@JsonProperty("hadoopConfDir") String hadoopConfDir,
@JsonProperty("parFields") List<FieldInfo> partitionFields) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.database = Preconditions.checkNotNull(database, "database of hive is null");
this.tableName = Preconditions.checkNotNull(tableName, "table of hive is null");
this.hiveConfDir = Preconditions.checkNotNull(hiveConfDir, "hive conf directory is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
index 2435a827a..2d649c288 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java
@@ -30,7 +30,7 @@ import org.apache.inlong.sort.protocol.constant.IcebergConstant;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
@@ -71,7 +71,7 @@ public class IcebergLoadNode extends LoadNode implements Serializable {
public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -82,7 +82,7 @@ public class IcebergLoadNode extends LoadNode implements Serializable {
@JsonProperty("catalogType") IcebergConstant.CatalogType catalogType,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 1542f5708..c65aa5dba 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -34,7 +34,7 @@ import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import javax.annotation.Nonnull;
@@ -72,7 +72,7 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
public KafkaLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@Nonnull @JsonProperty("topic") String topic,
@@ -81,7 +81,7 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
@JsonProperty("properties") Map<String, String> properties,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.topic = Preconditions.checkNotNull(topic, "topic is null");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrapServers is null");
this.format = Preconditions.checkNotNull(format, "format is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
index 9ed44d7dc..243b96dc4 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -61,7 +61,7 @@ public class MySqlLoadNode extends LoadNode implements Serializable {
@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -71,7 +71,7 @@ public class MySqlLoadNode extends LoadNode implements Serializable {
@JsonProperty("password") String password,
@JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.url = Preconditions.checkNotNull(url, "url is null");
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
index c725c07e1..26ef17ec1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -61,7 +61,7 @@ public class OracleLoadNode extends LoadNode implements Serializable {
@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -71,7 +71,7 @@ public class OracleLoadNode extends LoadNode implements Serializable {
@JsonProperty("password") String password,
@JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.url = Preconditions.checkNotNull(url, "url is null");
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
index 214479e0b..6c8e6d890 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.PostgresConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -73,7 +73,7 @@ public class PostgresLoadNode extends LoadNode implements Serializable {
@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -83,7 +83,7 @@ public class PostgresLoadNode extends LoadNode implements Serializable {
@JsonProperty("password") String password,
@JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.url = Preconditions.checkNotNull(url, "url is null");
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
index 59a91be16..63b35e901 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -70,7 +70,7 @@ public class SqlServerLoadNode extends LoadNode implements Serializable {
public SqlServerLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -81,7 +81,7 @@ public class SqlServerLoadNode extends LoadNode implements Serializable {
@JsonProperty(value = "schemaName", defaultValue = "dbo") String schemaName,
@JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.url = Preconditions.checkNotNull(url, "sqlserver url is null");
this.username = Preconditions.checkNotNull(username, "sqlserver user name is null");
this.password = Preconditions.checkNotNull(password, "sqlserver password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
index e0864dba1..9432180dd 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNode.java
@@ -29,7 +29,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.PostgresConstant;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -77,7 +77,7 @@ public class TDSQLPostgresLoadNode extends LoadNode implements Serializable {
@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("sinkParallelism") Integer sinkParallelism,
@@ -87,7 +87,7 @@ public class TDSQLPostgresLoadNode extends LoadNode implements Serializable {
@JsonProperty("password") String password,
@JsonProperty("tableName") String tableName,
@JsonProperty("primaryKey") String primaryKey) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy, sinkParallelism, properties);
+ super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.url = Preconditions.checkNotNull(url, "url is null");
this.username = Preconditions.checkNotNull(username, "username is null");
this.password = Preconditions.checkNotNull(password, "password is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
index f1bcfa344..a54886645 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/DistinctNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
@@ -63,7 +63,7 @@ public class DistinctNode extends TransformNode {
* @param id node id
* @param name node name
* @param fields The fields used to describe node schema
- * @param fieldRelationShips field relationShips ysed to describe node relationships
+ * @param fieldRelations field relations used to describe the relation between fields
* @param filters The filters used for data filter
* @param distinctFields The distinct fields used for partition
* @param orderField the order field used for sorting in partition
@@ -74,13 +74,13 @@ public class DistinctNode extends TransformNode {
public DistinctNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy,
@JsonProperty("distinctFields") List<FieldInfo> distinctFields,
@JsonProperty("orderField") FieldInfo orderField,
@JsonProperty("orderDirection") OrderDirection orderDirection) {
- super(id, name, fields, fieldRelationShips, filters, filterStrategy);
+ super(id, name, fields, fieldRelations, filters, filterStrategy);
this.distinctFields = Preconditions.checkNotNull(distinctFields, "distinctFields is null");
Preconditions.checkState(!distinctFields.isEmpty(), "distinct fields is empty");
this.orderField = Preconditions.checkNotNull(orderField, "orderField is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
index 8cfd272c8..886315b43 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/transform/TransformNode.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import java.io.Serializable;
@@ -60,8 +60,8 @@ public class TransformNode implements Node, Serializable {
private String name;
@JsonProperty("fields")
private List<FieldInfo> fields;
- @JsonProperty("fieldRelationShips")
- private List<FieldRelationShip> fieldRelationShips;
+ @JsonProperty("fieldRelations")
+ private List<FieldRelation> fieldRelations;
@JsonProperty("filters")
@JsonInclude(Include.NON_NULL)
private List<FilterFunction> filters;
@@ -73,16 +73,16 @@ public class TransformNode implements Node, Serializable {
public TransformNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("fields") List<FieldInfo> fields,
- @JsonProperty("fieldRelationShips") List<FieldRelationShip> fieldRelationShips,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
@JsonProperty("filters") List<FilterFunction> filters,
@JsonProperty("filterStrategy") FilterStrategy filterStrategy) {
this.id = Preconditions.checkNotNull(id, "id is null");
this.name = name;
this.fields = Preconditions.checkNotNull(fields, "fields is null");
Preconditions.checkState(!fields.isEmpty(), "fields is empty");
- this.fieldRelationShips = Preconditions.checkNotNull(fieldRelationShips,
- "fieldRelationShips is null");
- Preconditions.checkState(!fieldRelationShips.isEmpty(), "fieldRelationShips is empty");
+ this.fieldRelations = Preconditions.checkNotNull(fieldRelations,
+ "fieldRelations is null");
+ Preconditions.checkState(!fieldRelations.isEmpty(), "fieldRelations is empty");
this.filters = filters;
this.filterStrategy = filterStrategy;
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
similarity index 85%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
index 7ae103342..1955dcb29 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java
@@ -27,17 +27,16 @@ import org.apache.inlong.sort.protocol.FieldInfo;
/**
- * defines the relationship between fields
- * from input to output field
+ * Defines the relation between fields from input to output field
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
-@JsonTypeName("fieldRelationShip")
+@JsonTypeName("fieldRelation")
@Data
@NoArgsConstructor
-public class FieldRelationShip {
+public class FieldRelation {
@JsonProperty("inputField")
private FunctionParam inputField;
@@ -45,8 +44,8 @@ public class FieldRelationShip {
private FieldInfo outputField;
@JsonCreator
- public FieldRelationShip(@JsonProperty("inputField") FunctionParam inputField,
- @JsonProperty("outputField") FieldInfo outputField) {
+ public FieldRelation(@JsonProperty("inputField") FunctionParam inputField,
+ @JsonProperty("outputField") FieldInfo outputField) {
this.inputField = inputField;
this.outputField = outputField;
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
similarity index 81%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
index 71a042a2c..a32de63ec 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
import java.util.Map;
/**
- * Full outer join relationship class which defines the full outer join relationship
+ * Full outer join relation class which defines the full outer join relation
*/
@JsonTypeName("fullOuterJoin")
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class FullOuterJoinRelationShip extends JoinRelationShip {
+public class FullOuterJoinRelation extends JoinRelation {
private static final long serialVersionUID = -2551119250767202829L;
/**
- * FullOuterJoinRelationShip Constructor
+ * FullOuterJoinRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
@@ -49,9 +49,9 @@ public class FullOuterJoinRelationShip extends JoinRelationShip {
* the value of joinConditionMap is a list of join contidition
*/
@JsonCreator
- public FullOuterJoinRelationShip(@JsonProperty("inputs") List<String> inputs,
- @JsonProperty("outputs") List<String> outputs,
- @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+ public FullOuterJoinRelation(@JsonProperty("inputs") List<String> inputs,
+ @JsonProperty("outputs") List<String> outputs,
+ @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
super(inputs, outputs, joinConditionMap);
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
similarity index 81%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
index 57c64cbd7..3c9703dc0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
import java.util.Map;
/**
- * Inner join relationship class which defines the inner join relationship
+ * Inner join relation class which defines the inner join relation
*/
@JsonTypeName("innerJoin")
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class InnerJoinNodeRelationShip extends JoinRelationShip {
+public class InnerJoinNodeRelation extends JoinRelation {
private static final long serialVersionUID = -5446480979888656724L;
/**
- * InnerJoinNodeRelationShip Constructor
+ * InnerJoinNodeRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
@@ -49,9 +49,9 @@ public class InnerJoinNodeRelationShip extends JoinRelationShip {
* the value of joinConditionMap is a list of join contidition
*/
@JsonCreator
- public InnerJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
- @JsonProperty("outputs") List<String> outputs,
- @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+ public InnerJoinNodeRelation(@JsonProperty("inputs") List<String> inputs,
+ @JsonProperty("outputs") List<String> outputs,
+ @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
super(inputs, outputs, joinConditionMap);
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
similarity index 75%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
index 961175221..0ea75faf7 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/JoinRelation.java
@@ -37,15 +37,15 @@ import java.util.Map;
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = FullOuterJoinRelationShip.class, name = "fullOuterJoin"),
- @JsonSubTypes.Type(value = InnerJoinNodeRelationShip.class, name = "innerJoin"),
- @JsonSubTypes.Type(value = LeftOuterJoinNodeRelationShip.class, name = "leftOuterJoin"),
- @JsonSubTypes.Type(value = RightOuterJoinNodeRelationShip.class, name = "rightOutJoin")
+ @JsonSubTypes.Type(value = FullOuterJoinRelation.class, name = "fullOuterJoin"),
+ @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
+ @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
+ @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin")
})
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public abstract class JoinRelationShip extends NodeRelationShip {
+public abstract class JoinRelation extends NodeRelation {
private static final long serialVersionUID = -213673939512251116L;
@@ -53,7 +53,7 @@ public abstract class JoinRelationShip extends NodeRelationShip {
private Map<String, List<FilterFunction>> joinConditionMap;
/**
- * JoinRelationShip Constructor
+ * JoinRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
@@ -61,17 +61,17 @@ public abstract class JoinRelationShip extends NodeRelationShip {
* the key of joinConditionMap is the node id of join node
* the value of joinConditionMap is a list of join contidition
*/
- public JoinRelationShip(@JsonProperty("inputs") List<String> inputs,
- @JsonProperty("outputs") List<String> outputs,
- @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
+ public JoinRelation(@JsonProperty("inputs") List<String> inputs,
+ @JsonProperty("outputs") List<String> outputs,
+ @JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
super(inputs, outputs);
this.joinConditionMap = Preconditions.checkNotNull(joinConditionMap, "joinConditionMap is null");
Preconditions.checkState(!joinConditionMap.isEmpty(), "joinConditionMap is empty");
}
/**
- * Node relationship format
- * that is, the relationship is converted into a string representation of SQL
+ * Node relation format
+ * that is, the relation is converted into a string representation of SQL
*
* @return a string representation of SQL
*/
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
similarity index 88%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
index 193f2fc7d..e6a57ef20 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelation.java
@@ -29,18 +29,18 @@ import java.util.List;
import java.util.Map;
/**
- * Left outer join relationship class which defines the left outer join relationship
+ * Left outer join relation class which defines the left outer join relation
*/
@JsonTypeName("leftOuterJoin")
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class LeftOuterJoinNodeRelationShip extends JoinRelationShip {
+public class LeftOuterJoinNodeRelation extends JoinRelation {
private static final long serialVersionUID = -2982848817690520421L;
/**
- * LeftOuterJoinNodeRelationShip Constructor
+ * LeftOuterJoinNodeRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
@@ -49,7 +49,8 @@ public class LeftOuterJoinNodeRelationShip extends JoinRelationShip {
* the value of joinConditionMap is a list of join contidition
*/
@JsonCreator
- public LeftOuterJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
+ public LeftOuterJoinNodeRelation(
+ @JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
super(inputs, outputs, joinConditionMap);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
similarity index 72%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
index a2b0bb1e6..0179e315d 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelation.java
@@ -29,23 +29,23 @@ import java.io.Serializable;
import java.util.List;
/**
- * Node relationship base class which defines the simplest one-to-one relationship
+ * Node relation base class which defines the simplest one-to-one relation
*/
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = FullOuterJoinRelationShip.class, name = "fullOuterJoin"),
- @JsonSubTypes.Type(value = InnerJoinNodeRelationShip.class, name = "innerJoin"),
- @JsonSubTypes.Type(value = LeftOuterJoinNodeRelationShip.class, name = "leftOuterJoin"),
- @JsonSubTypes.Type(value = RightOuterJoinNodeRelationShip.class, name = "rightOutJoin"),
- @JsonSubTypes.Type(value = UnionNodeRelationShip.class, name = "union"),
- @JsonSubTypes.Type(value = NodeRelationShip.class, name = "baseRelation")
+ @JsonSubTypes.Type(value = FullOuterJoinRelation.class, name = "fullOuterJoin"),
+ @JsonSubTypes.Type(value = InnerJoinNodeRelation.class, name = "innerJoin"),
+ @JsonSubTypes.Type(value = LeftOuterJoinNodeRelation.class, name = "leftOuterJoin"),
+ @JsonSubTypes.Type(value = RightOuterJoinNodeRelation.class, name = "rightOutJoin"),
+ @JsonSubTypes.Type(value = UnionNodeRelation.class, name = "union"),
+ @JsonSubTypes.Type(value = NodeRelation.class, name = "baseRelation")
})
@Data
@NoArgsConstructor
-public class NodeRelationShip implements Serializable {
+public class NodeRelation implements Serializable {
private static final long serialVersionUID = 5491943876653981952L;
@@ -55,14 +55,14 @@ public class NodeRelationShip implements Serializable {
private List<String> outputs;
/**
- * NodeRelationShip Constructor
+ * NodeRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
*/
@JsonCreator
- public NodeRelationShip(@JsonProperty("inputs") List<String> inputs,
- @JsonProperty("outputs") List<String> outputs) {
+ public NodeRelation(@JsonProperty("inputs") List<String> inputs,
+ @JsonProperty("outputs") List<String> outputs) {
this.inputs = Preconditions.checkNotNull(inputs, "inputs is null");
Preconditions.checkState(!inputs.isEmpty(), "inputs is empty");
this.outputs = Preconditions.checkNotNull(outputs, "outputs is null");
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
similarity index 76%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
index 636213f50..4919d3fed 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelation.java
@@ -29,27 +29,28 @@ import java.util.List;
import java.util.Map;
/**
- * Right outer join relationship class which defines the right outer join relationship
+ * Right outer join relation class which defines the right outer join relation
*/
@JsonTypeName("rightOutJoin")
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
-public class RightOuterJoinNodeRelationShip extends JoinRelationShip {
+public class RightOuterJoinNodeRelation extends JoinRelation {
private static final long serialVersionUID = 9202862229428483437L;
/**
- * RightOuterJoinNodeRelationShip Constructor
+ * RightOuterJoinNodeRelation Constructor
*
- * @param inputs The inputs is a list of input node id
- * @param outputs The outputs is a list of output node id
+ * @param inputs The inputs is a list of input node id
+ * @param outputs The outputs is a list of output node id
* @param joinConditionMap The joinConditionMap is a map of join conditions
- * the key of joinConditionMap is the node id of join node
- * the value of joinConditionMap is a list of join contidition
+ * the key of joinConditionMap is the node id of join node
+ * the value of joinConditionMap is a list of join contidition
*/
@JsonCreator
- public RightOuterJoinNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
+ public RightOuterJoinNodeRelation(
+ @JsonProperty("inputs") List<String> inputs,
@JsonProperty("outputs") List<String> outputs,
@JsonProperty("joinConditionMap") Map<String, List<FilterFunction>> joinConditionMap) {
super(inputs, outputs, joinConditionMap);
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
similarity index 83%
rename from inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java
rename to inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
index b5cb94a99..a92dbd9de 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationShip.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelation.java
@@ -25,23 +25,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
import java.util.List;
/**
- * Union relationship class which defines the union relationship
+ * Union relation class which defines the union relation
*/
@JsonTypeName("union")
@EqualsAndHashCode(callSuper = true)
@Data
-public class UnionNodeRelationShip extends NodeRelationShip {
+public class UnionNodeRelation extends NodeRelation {
private static final long serialVersionUID = 6602357131254518291L;
/**
- * UnionNodeRelationShip Constructor
+ * UnionNodeRelation Constructor
*
* @param inputs The inputs is a list of input node id
* @param outputs The outputs is a list of output node id
*/
- public UnionNodeRelationShip(@JsonProperty("inputs") List<String> inputs,
- @JsonProperty("outputs") List<String> outputs) {
+ public UnionNodeRelation(@JsonProperty("inputs") List<String> inputs,
+ @JsonProperty("outputs") List<String> outputs) {
super(inputs, outputs);
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
index 26fff58d0..e2ed364ec 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/FieldInfoTest.java
@@ -41,7 +41,7 @@ public class FieldInfoTest extends SerializeBaseTest<FieldInfo> {
@Test
public void testDeserializeWithNodeId() throws JsonProcessingException {
FieldInfo fieldInfo = new FieldInfo("field_name", StringFormatInfo.INSTANCE);
- fieldInfo.setNodeId("1L");
+ fieldInfo.setNodeId("1");
ObjectMapper objectMapper = new ObjectMapper();
String fieldInfoStr = "{\"type\":\"base\",\"name\":\"field_name\","
+ "\"formatInfo\":{\"type\":\"string\"},\"nodeId\":\"1\"}";
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 38684674f..777acacd5 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -27,12 +27,12 @@ import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import java.util.Arrays;
import java.util.Collections;
@@ -65,14 +65,14 @@ public class GroupInfoTest extends SerializeBaseTest<GroupInfo> {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
@@ -80,10 +80,10 @@ public class GroupInfoTest extends SerializeBaseTest<GroupInfo> {
1, null, "id");
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
@Override
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
similarity index 72%
copy from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
index 0f920d533..fdf71f370 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/MetaFieldInfoTest.java
@@ -15,19 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.protocol.transformation.relation;
+package org.apache.inlong.sort.protocol;
import org.apache.inlong.sort.SerializeBaseTest;
-import java.util.Arrays;
-
/**
- * Tests for {@link NodeRelationShip}
+ * Test for {@link MetaFieldInfo}
*/
-public class NodeRelationTest extends SerializeBaseTest<NodeRelationShip> {
+public class MetaFieldInfoTest extends SerializeBaseTest<MetaFieldInfo> {
@Override
- public NodeRelationShip getTestObject() {
- return new NodeRelationShip(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
+ public MetaFieldInfo getTestObject() {
+ return new MetaFieldInfo("f1", MetaFieldInfo.MetaField.DATABASE_NAME);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index f9f8cf3e7..45a09c5e0 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -28,12 +28,12 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import java.util.Arrays;
import java.util.Collections;
@@ -66,14 +66,14 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
@@ -87,14 +87,14 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new HiveLoadNode("2", "hive_output", fields, relations, null, null,
@@ -102,10 +102,10 @@ public class StreamInfoTest extends SerializeBaseTest<StreamInfo> {
null, Arrays.asList(new FieldInfo("day", new LongFormatInfo())));
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
index f56a02650..dca17a878 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/ClickHouseLoadNodeTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.Node;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
@@ -35,7 +35,7 @@ public class ClickHouseLoadNodeTest extends SerializeBaseTest<Node> {
return new ClickHouseLoadNode("2", "test_clickhouse",
Arrays.asList(new FieldInfo("id", new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
new FieldInfo("id", new StringFormatInfo()))),
null,
null,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
index 483d26757..9775ac297 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HbaseLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
@@ -31,7 +31,7 @@ public class HbaseLoadNodeTest extends SerializeBaseTest<HbaseLoadNode> {
public HbaseLoadNode getTestObject() {
return new HbaseLoadNode("2", "test_hbase",
Arrays.asList(new FieldInfo("cf:id", new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("id", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("id", new StringFormatInfo()),
new FieldInfo("cf:id", new StringFormatInfo()))), null, null, 1, null, "mytable", "default",
"localhost:2181", "MD5(`id`)", null, null, null, null);
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
index 012e3dd05..1bd5aef17 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HiveLoadNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
import java.util.HashMap;
@@ -35,7 +35,7 @@ public class HiveLoadNodeTest extends SerializeBaseTest<HiveLoadNode> {
public HiveLoadNode getTestObject() {
return new HiveLoadNode("1", "test_hive_node",
Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
new FieldInfo("field", new StringFormatInfo()))), null, null,
1, new HashMap<>(), "myHive", "default",
"test", "/opt/hive-conf", "3.1.2",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index 0ff7316c7..3e93fc9f3 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
import java.util.TreeMap;
@@ -35,7 +35,7 @@ public class KafkaLoadNodeTest extends SerializeBaseTest<KafkaLoadNode> {
public KafkaLoadNode getTestObject() {
return new KafkaLoadNode("1", null,
Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("field", new StringFormatInfo()),
new FieldInfo("field", new StringFormatInfo()))), null, null,
"topic", "localhost:9092", new CanalJsonFormat(),
1, new TreeMap<>(), null);
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
index 83e47a8f9..d5ff93a25 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/MySqlLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Collections;
@@ -34,7 +34,7 @@ public class MySqlLoadNodeTest extends SerializeBaseTest<MySqlLoadNode> {
public MySqlLoadNode getTestObject() {
return new MySqlLoadNode("1", "mysql_output",
Collections.singletonList(new FieldInfo("name", new StringFormatInfo())),
- Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+ Collections.singletonList(new FieldRelation(new FieldInfo("name",
new StringFormatInfo()), new FieldInfo("name", new StringFormatInfo()))),
null, null, 1, null,
"jdbc:mysql://localhost:3306/inlong",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
index ca972fe23..c9e73a63d 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/OracleLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Collections;
@@ -34,7 +34,7 @@ public class OracleLoadNodeTest extends SerializeBaseTest<OracleLoadNode> {
public OracleLoadNode getTestObject() {
return new OracleLoadNode("1", "mysql_output",
Collections.singletonList(new FieldInfo("NAME", new StringFormatInfo())),
- Collections.singletonList(new FieldRelationShip(new FieldInfo("name",
+ Collections.singletonList(new FieldRelation(new FieldInfo("name",
new StringFormatInfo()), new FieldInfo("NAME", new StringFormatInfo()))),
null, null, 1, null,
"jdbc:oracle:thin:@localhost:1521:xe",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
index 8e01fdf5e..e854bffdc 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/PostgresLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
@@ -39,7 +39,7 @@ public class PostgresLoadNodeTest extends SerializeBaseTest<PostgresLoadNode> {
public PostgresLoadNode getTestObject() {
return new PostgresLoadNode("1", "postgres_output", Arrays.asList(new FieldInfo("name",
new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo()))), null, null, 1, null,
"jdbc:postgresql://localhost:5432/postgres",
"postgres",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
index e99328cf4..75f529671 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/SqlServerLoadNodeTest.java
@@ -22,7 +22,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
import java.util.List;
@@ -36,10 +36,10 @@ public class SqlServerLoadNodeTest extends SerializeBaseTest<SqlServerLoadNode>
public SqlServerLoadNode getTestObject() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo()))
);
return new SqlServerLoadNode("1", "sqlserver_out", fields, relations, null, null, 1,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
index b6a26319b..fe2d9c72f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/TDSQLPostgresLoadNodeTest.java
@@ -21,7 +21,7 @@ package org.apache.inlong.sort.protocol.node.load;
import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.Arrays;
@@ -39,7 +39,7 @@ public class TDSQLPostgresLoadNodeTest extends SerializeBaseTest<TDSQLPostgresLo
public TDSQLPostgresLoadNode getTestObject() {
return new TDSQLPostgresLoadNode("1", "tdsqlPostgres_output", Arrays.asList(new FieldInfo("name",
new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo()))), null, null, 1, null,
"jdbc:postgresql://localhost:5432/postgres",
"postgres",
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
index 7f308271c..bc9c2cb22 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/transform/DistinctNodeTest.java
@@ -21,7 +21,7 @@ import org.apache.inlong.sort.SerializeBaseTest;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import java.util.Arrays;
@@ -39,13 +39,13 @@ public class DistinctNodeTest extends SerializeBaseTest<DistinctNode> {
new FieldInfo("f3", new StringFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
Arrays.asList(
- new FieldRelationShip(new FieldInfo("f1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("f1", new StringFormatInfo()),
new FieldInfo("f1", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("f2", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("f2", new StringFormatInfo()),
new FieldInfo("f2", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("f3", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("f3", new StringFormatInfo()),
new FieldInfo("f3", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new StringFormatInfo()),
new FieldInfo("ts", new StringFormatInfo()))
),
null, null,
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
similarity index 82%
rename from inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java
rename to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
index 8290bf429..fb4db7fca 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationShipTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/FieldRelationTest.java
@@ -22,9 +22,9 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
/**
- * Test for {@link FieldRelationShip}
+ * Test for {@link FieldRelation}
*/
-public class FieldRelationShipTest extends SerializeBaseTest<FieldRelationShip> {
+public class FieldRelationTest extends SerializeBaseTest<FieldRelation> {
/**
* Get test object
@@ -32,8 +32,8 @@ public class FieldRelationShipTest extends SerializeBaseTest<FieldRelationShip>
* @return The test object
*/
@Override
- public FieldRelationShip getTestObject() {
- return new FieldRelationShip(new FieldInfo("f", StringFormatInfo.INSTANCE),
+ public FieldRelation getTestObject() {
+ return new FieldRelation(new FieldInfo("f", StringFormatInfo.INSTANCE),
new FieldInfo("f", StringFormatInfo.INSTANCE));
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
index 5f6b85b80..67e54d4c9 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/FullOuterJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
import java.util.TreeMap;
/**
- * Tests for {@link FullOuterJoinRelationShip}
+ * Tests for {@link FullOuterJoinRelation}
*/
-public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJoinRelationShip> {
+public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJoinRelation> {
@Override
- public FullOuterJoinRelationShip getTestObject() {
+ public FullOuterJoinRelation getTestObject() {
Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
joinConditionMap.put("2", Arrays.asList(
new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class FullOuterJoinNodeRelationTest extends SerializeBaseTest<FullOuterJo
new SingleValueFilterFunction(AndOperator.getInstance(),
new FieldInfo("name", "3", new StringFormatInfo()),
NotEqualOperator.getInstance(), new ConstantParam("test"))));
- return new FullOuterJoinRelationShip(Arrays.asList("1", "2", "3"),
+ return new FullOuterJoinRelation(Arrays.asList("1", "2", "3"),
Collections.singletonList("4"), joinConditionMap);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
index 0e15fe440..aef4c07f8 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/InnerJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
import java.util.TreeMap;
/**
- * Tests for {@link InnerJoinNodeRelationShip}
+ * Tests for {@link InnerJoinNodeRelation}
*/
-public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRelationShip> {
+public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRelation> {
@Override
- public InnerJoinNodeRelationShip getTestObject() {
+ public InnerJoinNodeRelation getTestObject() {
Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
joinConditionMap.put("2", Arrays.asList(
new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class InnerJoinNodeRelationTest extends SerializeBaseTest<InnerJoinNodeRe
new SingleValueFilterFunction(AndOperator.getInstance(),
new FieldInfo("name", "3", new StringFormatInfo()),
NotEqualOperator.getInstance(), new ConstantParam("test"))));
- return new InnerJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+ return new InnerJoinNodeRelation(Arrays.asList("1", "2", "3"),
Collections.singletonList("4"), joinConditionMap);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
index c52760e49..2cc6adbaa 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/LeftOuterJoinNodeRelationTest.java
@@ -35,12 +35,12 @@ import java.util.Map;
import java.util.TreeMap;
/**
- * Tests for {@link LeftOuterJoinNodeRelationShip}
+ * Tests for {@link LeftOuterJoinNodeRelation}
*/
-public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJoinNodeRelationShip> {
+public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJoinNodeRelation> {
@Override
- public LeftOuterJoinNodeRelationShip getTestObject() {
+ public LeftOuterJoinNodeRelation getTestObject() {
Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
joinConditionMap.put("2", Arrays.asList(
new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -58,7 +58,7 @@ public class LeftOuterJoinNodeRelationTest extends SerializeBaseTest<LeftOuterJo
new SingleValueFilterFunction(AndOperator.getInstance(),
new FieldInfo("name", "3", new StringFormatInfo()),
NotEqualOperator.getInstance(), new ConstantParam("test"))));
- return new LeftOuterJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+ return new LeftOuterJoinNodeRelation(Arrays.asList("1", "2", "3"),
Collections.singletonList("4"), joinConditionMap);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
index 0f920d533..3827ec9ae 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/NodeRelationTest.java
@@ -22,12 +22,12 @@ import org.apache.inlong.sort.SerializeBaseTest;
import java.util.Arrays;
/**
- * Tests for {@link NodeRelationShip}
+ * Tests for {@link NodeRelation}
*/
-public class NodeRelationTest extends SerializeBaseTest<NodeRelationShip> {
+public class NodeRelationTest extends SerializeBaseTest<NodeRelation> {
@Override
- public NodeRelationShip getTestObject() {
- return new NodeRelationShip(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
+ public NodeRelation getTestObject() {
+ return new NodeRelation(Arrays.asList("1", "2"), Arrays.asList("3", "4"));
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
index fb4bb0647..fb57d629f 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/RightOuterJoinNodeRelationTest.java
@@ -34,12 +34,12 @@ import java.util.Map;
import java.util.TreeMap;
/**
- * Tests for {@link RightOuterJoinNodeRelationShip}
+ * Tests for {@link RightOuterJoinNodeRelation}
*/
-public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuterJoinNodeRelationShip> {
+public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuterJoinNodeRelation> {
@Override
- public RightOuterJoinNodeRelationShip getTestObject() {
+ public RightOuterJoinNodeRelation getTestObject() {
Map<String, List<FilterFunction>> joinConditionMap = new TreeMap<>();
joinConditionMap.put("2", Arrays.asList(
new SingleValueFilterFunction(EmptyOperator.getInstance(),
@@ -57,7 +57,7 @@ public class RightOuterJoinNodeRelationTest extends SerializeBaseTest<RightOuter
new SingleValueFilterFunction(AndOperator.getInstance(),
new FieldInfo("name", "3", new StringFormatInfo()),
NotEqualOperator.getInstance(), new ConstantParam("test"))));
- return new RightOuterJoinNodeRelationShip(Arrays.asList("1", "2", "3"),
+ return new RightOuterJoinNodeRelation(Arrays.asList("1", "2", "3"),
Arrays.asList("4"), joinConditionMap);
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
index 1849bcc3b..e3de33a6e 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/relation/UnionNodeRelationTest.java
@@ -23,13 +23,13 @@ import java.util.Arrays;
import java.util.Collections;
/**
- * Tests for {@link UnionNodeRelationShip}
+ * Tests for {@link UnionNodeRelation}
*/
-public class UnionNodeRelationTest extends SerializeBaseTest<UnionNodeRelationShip> {
+public class UnionNodeRelationTest extends SerializeBaseTest<UnionNodeRelation> {
@Override
- public UnionNodeRelationShip getTestObject() {
- return new UnionNodeRelationShip(Arrays.asList("1", "2"),
+ public UnionNodeRelation getTestObject() {
+ return new UnionNodeRelation(Arrays.asList("1", "2"),
Collections.singletonList("3"));
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 396b4cc90..8ef086c6e 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -29,6 +29,7 @@ import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -41,13 +42,13 @@ import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.Function;
import org.apache.inlong.sort.protocol.transformation.FunctionParam;
-import org.apache.inlong.sort.protocol.transformation.relation.JoinRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.JoinRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.UnionNodeRelation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,7 +132,7 @@ public class FlinkSqlParser implements Parser {
/**
* Parse stream
*
- * @param streamInfo The encapsulation of nodes and node relationships
+ * @param streamInfo The encapsulation of nodes and node relations
*/
private void parseStream(StreamInfo streamInfo) {
Preconditions.checkNotNull(streamInfo, "stream is null");
@@ -146,7 +147,7 @@ public class FlinkSqlParser implements Parser {
Preconditions.checkNotNull(s.getId(), "node id is null");
nodeMap.put(s.getId(), s);
});
- Map<String, NodeRelationShip> relationMap = new HashMap<String, NodeRelationShip>();
+ Map<String, NodeRelation> relationMap = new HashMap<String, NodeRelation>();
streamInfo.getRelations().forEach(r -> {
for (String output : r.getOutputs()) {
relationMap.put(output, r);
@@ -161,15 +162,15 @@ public class FlinkSqlParser implements Parser {
/**
* parse node relation
* <p>
- * Here we only parse the output node in the relationship,
+ * Here we only parse the output node in the relation,
* and the input node parsing is achieved by parsing the dependent node parsing of the output node.
*
- * @param relation Define relationships between nodes, it also shows the data flow
- * @param nodeMap Store the mapping relationship between node id and node
- * @param relationMap Store the mapping relationship between node id and relation
+ * @param relation Define relations between nodes, it also shows the data flow
+ * @param nodeMap Store the mapping relation between node id and node
+ * @param relationMap Store the mapping relation between node id and relation
*/
- private void parseNodeRelation(NodeRelationShip relation, Map<String, Node> nodeMap,
- Map<String, NodeRelationShip> relationMap) {
+ private void parseNodeRelation(NodeRelation relation, Map<String, Node> nodeMap,
+ Map<String, NodeRelation> relationMap) {
log.info("start parse node relation, relation:{}", relation);
Preconditions.checkNotNull(relation, "relation is null");
Preconditions.checkState(relation.getInputs().size() > 0,
@@ -201,12 +202,12 @@ public class FlinkSqlParser implements Parser {
* Parse a node and recursively resolve its dependent nodes
*
* @param node The abstract of extract, transform, load
- * @param relation Define relationships between nodes, it also shows the data flow
- * @param nodeMap store the mapping relationship between node id and node
- * @param relationMap Store the mapping relationship between node id and relation
+ * @param relation Define relations between nodes, it also shows the data flow
+ * @param nodeMap store the mapping relation between node id and node
+ * @param relationMap Store the mapping relation between node id and relation
*/
- private void parseNode(Node node, NodeRelationShip relation, Map<String, Node> nodeMap,
- Map<String, NodeRelationShip> relationMap) {
+ private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeMap,
+ Map<String, NodeRelation> relationMap) {
if (hasParsedSet.contains(node.getId())) {
log.warn("the node has already been parsed, node id:{}", node.getId());
return;
@@ -240,31 +241,31 @@ public class FlinkSqlParser implements Parser {
hasParsedSet.add(node.getId());
} else if (node instanceof TransformNode) {
TransformNode transformNode = (TransformNode) node;
- Preconditions.checkNotNull(transformNode.getFieldRelationShips(),
+ Preconditions.checkNotNull(transformNode.getFieldRelations(),
"field relations is null");
- Preconditions.checkState(!transformNode.getFieldRelationShips().isEmpty(),
+ Preconditions.checkState(!transformNode.getFieldRelations().isEmpty(),
"field relations is empty");
String createSql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), createSql);
String selectSql;
- if (relation instanceof JoinRelationShip) {
- // parse join relation ship and generate the transform sql
+ if (relation instanceof JoinRelation) {
+ // parse join relation and generate the transform sql
Preconditions.checkState(relation.getInputs().size() > 1,
"join must have more than one input nodes");
Preconditions.checkState(relation.getOutputs().size() == 1,
"join node only support one output node");
- JoinRelationShip joinRelation = (JoinRelationShip) relation;
+ JoinRelation joinRelation = (JoinRelation) relation;
selectSql = genJoinSelectSql(transformNode, joinRelation, nodeMap);
- } else if (relation instanceof UnionNodeRelationShip) {
- // parse union relation ship and generate the transform sql
+ } else if (relation instanceof UnionNodeRelation) {
+ // parse union relation and generate the transform sql
Preconditions.checkState(relation.getInputs().size() > 1,
"union must have more than one input nodes");
Preconditions.checkState(relation.getOutputs().size() == 1,
"join node only support one output node");
- UnionNodeRelationShip unionRelation = (UnionNodeRelationShip) relation;
+ UnionNodeRelation unionRelation = (UnionNodeRelation) relation;
selectSql = genUnionNodeSelectSql(transformNode, unionRelation, nodeMap);
} else {
- // parse base relation ship that one to one and generate the transform sql
+ // parse base relation that one to one and generate the transform sql
Preconditions.checkState(relation.getInputs().size() == 1,
"simple transform only support one input node");
Preconditions.checkState(relation.getOutputs().size() == 1,
@@ -284,16 +285,16 @@ public class FlinkSqlParser implements Parser {
*
* @param transformNode The transform node
* @param unionRelation The union relation of sql
- * @param nodeMap Store the mapping relationship between node id and node
+ * @param nodeMap Store the mapping relation between node id and node
* @return Transform sql for this transform logic
*/
private String genUnionNodeSelectSql(TransformNode transformNode,
- UnionNodeRelationShip unionRelation, Map<String, Node> nodeMap) {
+ UnionNodeRelation unionRelation, Map<String, Node> nodeMap) {
throw new UnsupportedOperationException("Union is not currently supported");
}
private String genJoinSelectSql(TransformNode node,
- JoinRelationShip relation, Map<String, Node> nodeMap) {
+ JoinRelation relation, Map<String, Node> nodeMap) {
// Get tablename alias map by input nodes
Map<String, String> tableNameAliasMap = new HashMap<>(relation.getInputs().size());
relation.getInputs().forEach(s -> {
@@ -303,9 +304,9 @@ public class FlinkSqlParser implements Parser {
});
StringBuilder sb = new StringBuilder();
sb.append("SELECT ");
- Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(node.getFieldRelationShips().size());
- // Generate mapping for output field to FieldRelationShip
- node.getFieldRelationShips().forEach(s -> {
+ Map<String, FieldRelation> fieldRelationMap = new HashMap<>(node.getFieldRelations().size());
+ // Generate mapping for output field to FieldRelation
+ node.getFieldRelations().forEach(s -> {
fillOutTableNameAlias(Collections.singletonList(s.getInputField()), tableNameAliasMap);
fieldRelationMap.put(s.getOutputField().getName(), s);
});
@@ -355,7 +356,7 @@ public class FlinkSqlParser implements Parser {
*
* @param params The params used in filter, join condition, transform function etc.
* @param tableNameAliasMap The tablename alias map,
- * contains all tablename alias used in this relationship of nodes
+ * contains all tablename alias used in this relation of nodes
*/
private void fillOutTableNameAlias(List<FunctionParam> params, Map<String, String> tableNameAliasMap) {
for (FunctionParam param : params) {
@@ -416,16 +417,16 @@ public class FlinkSqlParser implements Parser {
* Generate the most basic conversion sql one-to-one
*
* @param node The transform node
- * @param relation Define relationships between nodes, it also shows the data flow
- * @param nodeMap Store the mapping relationship between node id and node
+ * @param relation Define relations between nodes, it also shows the data flow
+ * @param nodeMap Store the mapping relation between node id and node
* @return Transform sql for this transform logic
*/
private String genSimpleTransformSelectSql(TransformNode node,
- NodeRelationShip relation, Map<String, Node> nodeMap) {
+ NodeRelation relation, Map<String, Node> nodeMap) {
StringBuilder sb = new StringBuilder();
sb.append("SELECT ");
- Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(node.getFieldRelationShips().size());
- node.getFieldRelationShips().forEach(s -> {
+ Map<String, FieldRelation> fieldRelationMap = new HashMap<>(node.getFieldRelations().size());
+ node.getFieldRelations().forEach(s -> {
fieldRelationMap.put(s.getOutputField().getName(), s);
});
parseFieldRelations(node.getFields(), fieldRelationMap, sb);
@@ -468,9 +469,9 @@ public class FlinkSqlParser implements Parser {
* @param sb Container for storing sql
*/
private void parseFieldRelations(List<FieldInfo> fields,
- Map<String, FieldRelationShip> fieldRelationMap, StringBuilder sb) {
+ Map<String, FieldRelation> fieldRelationMap, StringBuilder sb) {
for (FieldInfo field : fields) {
- FieldRelationShip fieldRelation = fieldRelationMap.get(field.getName());
+ FieldRelation fieldRelation = fieldRelationMap.get(field.getName());
if (fieldRelation != null) {
sb.append("\n ").append(fieldRelation.getInputField().format())
.append(" AS ").append(field.format()).append(",");
@@ -490,8 +491,8 @@ public class FlinkSqlParser implements Parser {
* @return Insert sql
*/
private String genLoadNodeInsertSql(LoadNode loadNode, Node inputNode) {
- Preconditions.checkNotNull(loadNode.getFieldRelationShips(), "field relations is null");
- Preconditions.checkState(!loadNode.getFieldRelationShips().isEmpty(),
+ Preconditions.checkNotNull(loadNode.getFieldRelations(), "field relations is null");
+ Preconditions.checkState(!loadNode.getFieldRelations().isEmpty(),
"field relations is empty");
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO `").append(loadNode.genTableName()).append("` ");
@@ -499,8 +500,8 @@ public class FlinkSqlParser implements Parser {
if (loadNode instanceof HbaseLoadNode) {
parseHbaseLoadFieldRelation((HbaseLoadNode) loadNode, sb);
} else {
- Map<String, FieldRelationShip> fieldRelationMap = new HashMap<>(loadNode.getFieldRelationShips().size());
- loadNode.getFieldRelationShips().forEach(s -> {
+ Map<String, FieldRelation> fieldRelationMap = new HashMap<>(loadNode.getFieldRelations().size());
+ loadNode.getFieldRelations().forEach(s -> {
fieldRelationMap.put(s.getOutputField().getName(), s);
});
parseFieldRelations(loadNode.getFields(), fieldRelationMap, sb);
@@ -512,13 +513,13 @@ public class FlinkSqlParser implements Parser {
private void parseHbaseLoadFieldRelation(HbaseLoadNode hbaseLoadNode, StringBuilder sb) {
sb.append(hbaseLoadNode.getRowKey()).append(" as rowkey,\n");
- List<FieldRelationShip> fieldRelationShips = hbaseLoadNode.getFieldRelationShips();
- Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
- fieldRelationShips);
- for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+ List<FieldRelation> fieldRelations = hbaseLoadNode.getFieldRelations();
+ Map<String, List<FieldRelation>> columnFamilyMapFields = genColumnFamilyMapFieldRelations(
+ fieldRelations);
+ for (Map.Entry<String, List<FieldRelation>> entry : columnFamilyMapFields.entrySet()) {
StringBuilder fieldAppend = new StringBuilder(" ROW(");
- for (FieldRelationShip fieldRelationShip : entry.getValue()) {
- FieldInfo fieldInfo = (FieldInfo) fieldRelationShip.getInputField();
+ for (FieldRelation fieldRelation : entry.getValue()) {
+ FieldInfo fieldInfo = (FieldInfo) fieldRelation.getInputField();
fieldAppend.append(fieldInfo.getName()).append(",");
}
if (fieldAppend.length() > 0) {
@@ -572,14 +573,14 @@ public class FlinkSqlParser implements Parser {
StringBuilder sb = new StringBuilder("CREATE TABLE `");
sb.append(node.genTableName()).append("`(\n");
sb.append("rowkey STRING,\n");
- List<FieldRelationShip> fieldRelationShips = node.getFieldRelationShips();
- Map<String, List<FieldRelationShip>> columnFamilyMapFields = genColumnFamilyMapFieldRelationShips(
- fieldRelationShips);
- for (Map.Entry<String, List<FieldRelationShip>> entry : columnFamilyMapFields.entrySet()) {
+ List<FieldRelation> fieldRelations = node.getFieldRelations();
+ Map<String, List<FieldRelation>> columnFamilyMapFields = genColumnFamilyMapFieldRelations(
+ fieldRelations);
+ for (Map.Entry<String, List<FieldRelation>> entry : columnFamilyMapFields.entrySet()) {
sb.append(entry.getKey());
StringBuilder fieldsAppend = new StringBuilder(" Row<");
- for (FieldRelationShip fieldRelationShip : entry.getValue()) {
- FieldInfo fieldInfo = fieldRelationShip.getOutputField();
+ for (FieldRelation fieldRelation : entry.getValue()) {
+ FieldInfo fieldInfo = fieldRelation.getOutputField();
fieldsAppend.append(fieldInfo.getName().split(":")[1]).append(" ")
.append(TableFormatUtils.deriveLogicalType(fieldInfo.getFormatInfo()).asSummaryString())
.append(",");
@@ -595,13 +596,13 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
- private Map<String, List<FieldRelationShip>> genColumnFamilyMapFieldRelationShips(
- List<FieldRelationShip> fieldRelationShips) {
- Map<String, List<FieldRelationShip>> columnFamilyMapFields = new HashMap<>(16);
- for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
- String columnFamily = fieldRelationShip.getOutputField().getName().split(":")[0];
+ private Map<String, List<FieldRelation>> genColumnFamilyMapFieldRelations(
+ List<FieldRelation> fieldRelations) {
+ Map<String, List<FieldRelation>> columnFamilyMapFields = new HashMap<>(16);
+ for (FieldRelation fieldRelation : fieldRelations) {
+ String columnFamily = fieldRelation.getOutputField().getName().split(":")[0];
columnFamilyMapFields.computeIfAbsent(columnFamily, v -> new ArrayList<>())
- .add(fieldRelationShip);
+ .add(fieldRelation);
}
return columnFamilyMapFields;
}
@@ -666,7 +667,10 @@ public class FlinkSqlParser implements Parser {
StringBuilder sb = new StringBuilder();
for (FieldInfo field : fields) {
sb.append(" `").append(field.getName()).append("` ");
- if (field instanceof BuiltInFieldInfo) {
+ if (field instanceof MetaFieldInfo) {
+ MetaFieldInfo metaFieldInfo = (MetaFieldInfo) field;
+ parseMetaField(node, metaFieldInfo, sb);
+ } else if (field instanceof BuiltInFieldInfo) {
BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) field;
parseMetaField(node, builtInFieldInfo, sb);
} else {
@@ -680,6 +684,7 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
+ @Deprecated
private void parseMetaField(Node node, BuiltInFieldInfo metaField, StringBuilder sb) {
if (metaField.getBuiltInField() == BuiltInField.PROCESS_TIME) {
sb.append(" AS PROCTIME()");
@@ -700,6 +705,27 @@ public class FlinkSqlParser implements Parser {
}
}
+ private void parseMetaField(Node node, MetaFieldInfo metaFieldInfo, StringBuilder sb) {
+ if (metaFieldInfo.getMetaField() == MetaFieldInfo.MetaField.PROCESS_TIME) {
+ sb.append(" AS PROCTIME()");
+ return;
+ }
+ if (node instanceof MySqlExtractNode) {
+ sb.append(parseMySqlExtractNodeMetaField(metaFieldInfo));
+ } else if (node instanceof OracleExtractNode) {
+ sb.append(parseOracleExtractNodeMetaField(metaFieldInfo));
+ } else if (node instanceof KafkaExtractNode) {
+ sb.append(parseKafkaExtractNodeMetaField(metaFieldInfo));
+ } else if (node instanceof KafkaLoadNode) {
+ sb.append(parseKafkaLoadNodeMetaField(metaFieldInfo));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("This node:%s does not currently support metadata fields",
+ node.getClass().getName()));
+ }
+ }
+
+ @Deprecated
private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
String metaType;
switch (metaField.getBuiltInField()) {
@@ -745,11 +771,59 @@ public class FlinkSqlParser implements Parser {
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
break;
default:
- metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaField.getBuiltInField()));
+ }
+ return metaType;
+ }
+
+ private String parseKafkaLoadNodeMetaField(MetaFieldInfo metaFieldInfo) {
+ String metaType;
+ switch (metaFieldInfo.getMetaField()) {
+ case TABLE_NAME:
+ metaType = "STRING METADATA FROM 'value.table'";
+ break;
+ case DATABASE_NAME:
+ metaType = "STRING METADATA FROM 'value.database'";
+ break;
+ case OP_TS:
+ metaType = "TIMESTAMP(3) METADATA FROM 'value.event-timestamp'";
+ break;
+ case OP_TYPE:
+ metaType = "STRING METADATA FROM 'value.op-type'";
+ break;
+ case DATA:
+ metaType = "STRING METADATA FROM 'value.data'";
+ break;
+ case IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
+ break;
+ case TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
+ break;
+ case SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
+ break;
+ case MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
+ break;
+ case PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
+ break;
+ case BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'value.batch-id'";
+ break;
+ case UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaFieldInfo.getMetaField()));
}
return metaType;
}
+ @Deprecated
private String parseKafkaExtractNodeMetaField(BuiltInFieldInfo metaField) {
String metaType;
switch (metaField.getBuiltInField()) {
@@ -793,7 +867,98 @@ public class FlinkSqlParser implements Parser {
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
break;
default:
- metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaField.getBuiltInField()));
+ }
+ return metaType;
+ }
+
+ private String parseKafkaExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+ String metaType;
+ switch (metaFieldInfo.getMetaField()) {
+ case TABLE_NAME:
+ metaType = "STRING METADATA FROM 'value.table'";
+ break;
+ case DATABASE_NAME:
+ metaType = "STRING METADATA FROM 'value.database'";
+ break;
+ case SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'value.sql-type'";
+ break;
+ case PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'value.pk-names'";
+ break;
+ case TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'";
+ break;
+ case OP_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'";
+ break;
+ // additional metadata
+ case OP_TYPE:
+ metaType = "STRING METADATA FROM 'value.op-type'";
+ break;
+ case IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'value.is-ddl'";
+ break;
+ case MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM 'value.mysql-type'";
+ break;
+ case BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'value.batch-id'";
+ break;
+ case UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update-before'";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaFieldInfo.getMetaField()));
+ }
+ return metaType;
+ }
+
+ private String parseMySqlExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+ String metaType;
+ switch (metaFieldInfo.getMetaField()) {
+ case TABLE_NAME:
+ metaType = "STRING METADATA FROM 'meta.table_name' VIRTUAL";
+ break;
+ case DATABASE_NAME:
+ metaType = "STRING METADATA FROM 'meta.database_name' VIRTUAL";
+ break;
+ case OP_TS:
+ metaType = "TIMESTAMP(3) METADATA FROM 'meta.op_ts' VIRTUAL";
+ break;
+ case OP_TYPE:
+ metaType = "STRING METADATA FROM 'meta.op_type' VIRTUAL";
+ break;
+ case DATA:
+ metaType = "STRING METADATA FROM 'meta.data' VIRTUAL";
+ break;
+ case IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL";
+ break;
+ case TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL";
+ break;
+ case SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL";
+ break;
+ case MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM 'meta.mysql_type' VIRTUAL";
+ break;
+ case PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL";
+ break;
+ case BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL";
+ break;
+ case UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaFieldInfo.getMetaField()));
}
return metaType;
}
@@ -843,11 +1008,13 @@ public class FlinkSqlParser implements Parser {
metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before' VIRTUAL";
break;
default:
- metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaField.getBuiltInField()));
}
return metaType;
}
+ @Deprecated
private String parseOracleExtractNodeMetaField(BuiltInFieldInfo metaField) {
String metaType;
switch (metaField.getBuiltInField()) {
@@ -864,7 +1031,30 @@ public class FlinkSqlParser implements Parser {
metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
break;
default:
- metaType = TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSummaryString();
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaField.getBuiltInField()));
+ }
+ return metaType;
+ }
+
+ private String parseOracleExtractNodeMetaField(MetaFieldInfo metaFieldInfo) {
+ String metaType;
+ switch (metaFieldInfo.getMetaField()) {
+ case TABLE_NAME:
+ metaType = "STRING METADATA FROM 'table_name' VIRTUAL";
+ break;
+ case SCHEMA_NAME:
+ metaType = "STRING METADATA FROM 'schema_name' VIRTUAL";
+ break;
+ case DATABASE_NAME:
+ metaType = "STRING METADATA FROM 'database_name' VIRTUAL";
+ break;
+ case OP_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL";
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("Unsupport meta field: %s",
+ metaFieldInfo.getMetaField()));
}
return metaType;
}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index 7fb4d6fe4..3a6057517 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -31,8 +31,8 @@ import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -62,8 +62,8 @@ public class AllMigrateTest {
private KafkaLoadNode buildAllMigrateKafkaNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("data", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("data", new StringFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("data", new StringFormatInfo()),
new FieldInfo("data", new StringFormatInfo())));
CsvFormat csvFormat = new CsvFormat();
csvFormat.setDisableQuoteCharacter(true);
@@ -73,10 +73,10 @@ public class AllMigrateTest {
null, null);
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
index f0b5c31d2..5c0c9893e 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ClickHouseSqlParserTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -63,10 +63,10 @@ public class ClickHouseSqlParserTest {
private ClickHouseLoadNode buildClickHouseLoadNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())));
return new ClickHouseLoadNode(id, "test_clickhouse",
@@ -90,10 +90,10 @@ public class ClickHouseSqlParserTest {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
index 5f1192922..bc538e3dd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DistinctNodeSqlParseTest.java
@@ -39,13 +39,13 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -105,14 +105,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -127,14 +127,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -149,14 +149,14 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("3", "kafka_output", fields, relations, null,
@@ -174,13 +174,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo())
),
Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
),
null, null,
@@ -198,13 +198,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo())
),
Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
),
null, null,
@@ -222,13 +222,13 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo())
),
Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
),
null, null,
@@ -237,10 +237,10 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
OrderDirection.ASC);
}
- public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
index ef4a562ba..59f2394c3 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FilterParseTest.java
@@ -37,14 +37,14 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -78,14 +78,14 @@ public class FilterParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
List<FilterFunction> filters = Arrays.asList(
@@ -102,10 +102,10 @@ public class FilterParseTest extends AbstractTestBase {
null, "id");
}
- public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index b81a54c92..586488a80 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -39,12 +39,12 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -96,14 +96,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
@@ -112,10 +112,10 @@ public class FlinkSqlParserTest extends AbstractTestBase {
null, null);
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
private HiveLoadNode buildHiveNode(String id) {
@@ -124,14 +124,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new HiveLoadNode(id, "hive_output",
@@ -147,14 +147,14 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
index cd629ee6c..ae3ae8b68 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FullOuterJoinSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.FullOuterJoinRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.FullOuterJoinRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -61,7 +61,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
- * Test for {@link FullOuterJoinRelationShip}
+ * Test for {@link FullOuterJoinRelation}
*/
public class FullOuterJoinSqlParseTest extends AbstractTestBase {
@@ -118,16 +118,16 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
);
return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -149,15 +149,15 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
), null, null);
}
@@ -183,17 +183,17 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
), filters, null,
Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -205,12 +205,12 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A NodeRelationShip
+ * @return A NodeRelation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
@@ -218,9 +218,9 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A FullOuterJoinRelationShip
+ * @return A FullOuterJoinRelation
*/
- private NodeRelationShip buildFullOuterJoinRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildFullOuterJoinRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -230,7 +230,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
new FieldInfo("id", "3", new LongFormatInfo()))));
- return new FullOuterJoinRelationShip(inputIds, outputIds, conditionMap);
+ return new FullOuterJoinRelation(inputIds, outputIds, conditionMap);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
index cb8e6282a..19feacd00 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HbaseLoadFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -74,9 +74,9 @@ public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
return new HbaseLoadNode("2", "test_hbase",
Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("age", new LongFormatInfo()),
new FieldInfo("cf:age", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "mytable",
"default",
"localhost:2181", "MD5(`name`)", null, null, null, null);
@@ -89,10 +89,10 @@ public class HbaseLoadFlinkSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index f59a5d397..565d32633 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -36,8 +36,8 @@ import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -75,14 +75,14 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("salary", new StringFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
@@ -108,14 +108,14 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
);
@@ -144,10 +144,10 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
@Test
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
similarity index 87%
rename from inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
rename to inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
index 7fae01782..e744348a3 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/InnerJoinRelationSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.InnerJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -61,9 +61,9 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
- * Test for {@link InnerJoinNodeRelationShip}
+ * Test for {@link InnerJoinNodeRelation}
*/
-public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
+public class InnerJoinRelationSqlParseTest extends AbstractTestBase {
/**
* Build the first kafka extract node
@@ -118,16 +118,16 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
);
return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -147,16 +147,16 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
);
return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -178,15 +178,15 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
), null, null);
}
@@ -212,17 +212,17 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
), filters, null,
Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -234,12 +234,12 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A NodeRelationShip
+ * @return A NodeRelation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
@@ -247,9 +247,9 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A InnerJoinNodeRelationShip
+ * @return A InnerJoinNodeRelation
*/
- private NodeRelationShip buildInnerJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildInnerJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -259,7 +259,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
new FieldInfo("id", "3", new LongFormatInfo()))));
- return new InnerJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+ return new InnerJoinNodeRelation(inputIds, outputIds, conditionMap);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
index 5758372fc..32b3dd915 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/LeftOuterJoinSqlParseTest.java
@@ -40,7 +40,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -49,8 +49,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.LeftOuterJoinNodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -62,7 +62,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
- * Test for {@link LeftOuterJoinNodeRelationShip}
+ * Test for {@link LeftOuterJoinNodeRelation}
*/
public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
@@ -119,16 +119,16 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
);
return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -150,15 +150,15 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
), null, null);
}
@@ -184,17 +184,17 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
), filters, FilterStrategy.RETAIN,
Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -206,12 +206,12 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A NodeRelationShip
+ * @return A NodeRelation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
@@ -219,9 +219,9 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A LeftOuterJoinNodeRelationShip
+ * @return A LeftOuterJoinNodeRelation
*/
- private NodeRelationShip buildLeftOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildLeftOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -231,7 +231,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
new FieldInfo("id", "3", new LongFormatInfo()))));
- return new LeftOuterJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+ return new LeftOuterJoinNodeRelation(inputIds, outputIds, conditionMap);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
index 252ca4f0e..045fba0c7 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MetaFieldSyncTest.java
@@ -42,8 +42,8 @@ import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -119,36 +119,36 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("database", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("database", new TimestampFormatInfo()),
new FieldInfo("database", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("table", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("table", new TimestampFormatInfo()),
new FieldInfo("table", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("pk_names", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("pk_names", new TimestampFormatInfo()),
new FieldInfo("pk_names", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("event_time", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("event_time", new TimestampFormatInfo()),
new FieldInfo("event_time", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("event_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("event_type", new TimestampFormatInfo()),
new FieldInfo("event_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("isddl", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("isddl", new TimestampFormatInfo()),
new FieldInfo("isddl", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("batch_id", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("batch_id", new TimestampFormatInfo()),
new FieldInfo("batch_id", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("mysql_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("mysql_type", new TimestampFormatInfo()),
new FieldInfo("mysql_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("sql_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("sql_type", new TimestampFormatInfo()),
new FieldInfo("sql_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("meta_ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("meta_ts", new TimestampFormatInfo()),
new FieldInfo("meta_ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("up_before", new TimestampFormatInfo()),
new FieldInfo("up_before", new TimestampFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
@@ -219,36 +219,36 @@ public class MetaFieldSyncTest extends AbstractTestBase {
new BuiltInFieldInfo("up_before", new MapFormatInfo(new StringFormatInfo(),
new StringFormatInfo()), BuiltInField.METADATA_UPDATE_BEFORE)
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("database", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("database", new TimestampFormatInfo()),
new FieldInfo("database", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("table", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("table", new TimestampFormatInfo()),
new FieldInfo("table", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("pk_names", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("pk_names", new TimestampFormatInfo()),
new FieldInfo("pk_names", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("event_time", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("event_time", new TimestampFormatInfo()),
new FieldInfo("event_time", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("event_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("event_type", new TimestampFormatInfo()),
new FieldInfo("event_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("isddl", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("isddl", new TimestampFormatInfo()),
new FieldInfo("isddl", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("batch_id", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("batch_id", new TimestampFormatInfo()),
new FieldInfo("batch_id", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("mysql_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("mysql_type", new TimestampFormatInfo()),
new FieldInfo("mysql_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("sql_type", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("sql_type", new TimestampFormatInfo()),
new FieldInfo("sql_type", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("meta_ts", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("meta_ts", new TimestampFormatInfo()),
new FieldInfo("meta_ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("up_before", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("up_before", new TimestampFormatInfo()),
new FieldInfo("up_before", new TimestampFormatInfo()))
);
return new KafkaLoadNode("4", "kafka_output2", fields, relations, null,
@@ -257,10 +257,10 @@ public class MetaFieldSyncTest extends AbstractTestBase {
null, "id");
}
- public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ public NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
index da114700c..7ed02fcd7 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MongoExtractFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -60,10 +60,10 @@ public class MongoExtractFlinkSqlParseTest extends AbstractTestBase {
private KafkaLoadNode buildAllMigrateKafkaNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("_id", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("_id", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("_id", new StringFormatInfo()),
new FieldInfo("_id", new StringFormatInfo())));
CsvFormat csvFormat = new CsvFormat();
csvFormat.setDisableQuoteCharacter(true);
@@ -73,10 +73,10 @@ public class MongoExtractFlinkSqlParseTest extends AbstractTestBase {
null, "_id");
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
index 20f944cb9..f180b68e4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/MySqlLoadSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -67,12 +67,12 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo())
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo()))
);
return new MySqlLoadNode("2", "mysql_output", fields, relations, null,
@@ -87,10 +87,10 @@ public class MySqlLoadSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
index c5106f6fb..f18eafd4b 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleExtractSqlParseTest.java
@@ -34,8 +34,8 @@ import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -70,12 +70,12 @@ public class OracleExtractSqlParseTest extends AbstractTestBase {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo())
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("ID", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("ID", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("NAME", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("NAME", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("AGE", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("AGE", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
@@ -91,10 +91,10 @@ public class OracleExtractSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
index b7553c183..d6beb8023 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/OracleLoadSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -67,12 +67,12 @@ public class OracleLoadSqlParseTest extends AbstractTestBase {
new FieldInfo("NAME", new StringFormatInfo()),
new FieldInfo("AGE", new IntFormatInfo())
);
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("ID", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("NAME", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("AGE", new IntFormatInfo()))
);
return new OracleLoadNode("2", "oracle_output", fields, relations, null,
@@ -87,10 +87,10 @@ public class OracleLoadSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
index 3dec4f593..25bb170e2 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresExtractFlinkSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -70,9 +70,9 @@ public class PostgresExtractFlinkSqlParseTest extends AbstractTestBase {
return new HbaseLoadNode("2", "hbase_output",
Arrays.asList(new FieldInfo("cf:age", new LongFormatInfo()), new FieldInfo("cf:name",
new StringFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("age", new LongFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("age", new LongFormatInfo()),
new FieldInfo("cf:age", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("cf:name", new StringFormatInfo()))), null, null, 1, null, "user",
"default",
"localhost:2181", "MD5(`name`)", null, "/hbase", null, null);
@@ -85,10 +85,10 @@ public class PostgresExtractFlinkSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
index 49317f3f1..3dc17c7f2 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PostgresLoadNodeFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -73,9 +73,9 @@ public class PostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
private PostgresLoadNode buildPostgresLoadNode() {
return new PostgresLoadNode("2", "postgres_output", Arrays.asList(new FieldInfo("name",
new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo()))), null, null, 1, null,
"jdbc:postgresql://localhost:5432/postgres",
"postgres",
@@ -91,10 +91,10 @@ public class PostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
index d28e788d0..8b3a27e22 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/PulsarSqlParserTest.java
@@ -34,8 +34,8 @@ import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -49,10 +49,10 @@ public class PulsarSqlParserTest {
private KafkaLoadNode buildKafkaLoadNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())));
return new KafkaLoadNode("1", "kafka_output", fields, relations, null, null,
"workerJson", "localhost:9092",
@@ -77,10 +77,10 @@ public class PulsarSqlParserTest {
null);
}
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
@Test
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
index 51af295fd..d94387a86 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/RightOuterJoinSqlParseTest.java
@@ -39,7 +39,7 @@ import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@@ -48,8 +48,8 @@ import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.MoreThanOrEqualOperator;
import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.RightOuterJoinNodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -61,7 +61,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
/**
- * Test for {@link RightOuterJoinNodeRelationShip}
+ * Test for {@link RightOuterJoinNodeRelation}
*/
public class RightOuterJoinSqlParseTest extends AbstractTestBase {
@@ -118,16 +118,16 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
);
return new KafkaLoadNode("5", "kafka_output", fields, relations, null,
@@ -149,15 +149,15 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo()))
), null, null);
}
@@ -183,17 +183,17 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("salary", new FloatFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())
), Arrays.asList(
- new FieldRelationShip(new FieldInfo("id", "1", new LongFormatInfo()),
+ new FieldRelation(new FieldInfo("id", "1", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", "1", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", "1", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", "2", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", "2", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("ts", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("ts", "3", new TimestampFormatInfo()),
new FieldInfo("ts", new TimestampFormatInfo())),
- new FieldRelationShip(new FieldInfo("salary", "3", new TimestampFormatInfo()),
+ new FieldRelation(new FieldInfo("salary", "3", new TimestampFormatInfo()),
new FieldInfo("salary", new TimestampFormatInfo()))
), filters, null,
Collections.singletonList(new FieldInfo("name", "1", new StringFormatInfo())),
@@ -205,12 +205,12 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A NodeRelationShip
+ * @return A NodeRelation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
@@ -218,9 +218,9 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
*
* @param inputs The inputs that is the id list of input nodes
* @param outputs The outputs that is the id list of output nodes
- * @return A RightOuterJoinNodeRelationShip
+ * @return A RightOuterJoinNodeRelation
*/
- private NodeRelationShip buildRightOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildRightOuterJoinNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
Map<String, List<FilterFunction>> conditionMap = new TreeMap<>();
@@ -230,7 +230,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
conditionMap.put("3", Collections.singletonList(new SingleValueFilterFunction(EmptyOperator.getInstance(),
new FieldInfo("id", "1", new LongFormatInfo()), EqualOperator.getInstance(),
new FieldInfo("id", "3", new LongFormatInfo()))));
- return new RightOuterJoinNodeRelationShip(inputIds, outputIds, conditionMap);
+ return new RightOuterJoinNodeRelation(inputIds, outputIds, conditionMap);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
index 43b5a2e50..e0e86a4f6 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/SqlServerNodeSqlParseTest.java
@@ -33,8 +33,8 @@ import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -83,10 +83,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
private KafkaLoadNode buildKafkaNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("val_char", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("val_char", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("val_char", new StringFormatInfo()),
new FieldInfo("val_char", new StringFormatInfo()))
);
return new KafkaLoadNode(id, "kafka_output", fields, relations, null, null,
@@ -101,10 +101,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
private SqlServerLoadNode buildSqlServerLoadNode(String id) {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("name", new StringFormatInfo()));
- List<FieldRelationShip> relations = Arrays
- .asList(new FieldRelationShip(new FieldInfo("id", new LongFormatInfo()),
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
new FieldInfo("id", new LongFormatInfo())),
- new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo()))
);
return new SqlServerLoadNode(id, "sqlserver_out", fields, relations, null, null, 1,
@@ -115,10 +115,10 @@ public class SqlServerNodeSqlParseTest extends AbstractTestBase {
/**
* Build node relation.
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
index 22033f8f5..8622c1b47 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/TDSQLPostgresLoadNodeFlinkSqlParseTest.java
@@ -32,8 +32,8 @@ import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
-import org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.junit.Assert;
import org.junit.Test;
@@ -73,9 +73,9 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
private TDSQLPostgresLoadNode buildTDSQLPostgresLoadNode() {
return new TDSQLPostgresLoadNode("2", "tdsqlPostgres_output", Arrays.asList(new FieldInfo("name",
new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())),
- Arrays.asList(new FieldRelationShip(new FieldInfo("name", new StringFormatInfo()),
+ Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("name", new StringFormatInfo())),
- new FieldRelationShip(new FieldInfo("age", new IntFormatInfo()),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
new FieldInfo("age", new IntFormatInfo()))), null, null, 1, null,
"jdbc:postgresql://localhost:5432/tdsql",
"tdsqlpostgres",
@@ -91,10 +91,10 @@ public class TDSQLPostgresLoadNodeFlinkSqlParseTest extends AbstractTestBase {
* @param outputs load node
* @return node relation
*/
- private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
- return new NodeRelationShip(inputIds, outputIds);
+ return new NodeRelation(inputIds, outputIds);
}
/**