You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/15 06:32:51 UTC
[inlong] branch master updated: [INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d8f135491 [INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)
d8f135491 is described below
commit d8f135491501c0fb72892dbebc44f394d1ea1b16
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Mar 15 14:32:45 2023 +0800
[INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)
---
.../inlong/sort/parser/impl/FlinkSqlParser.java | 47 ++++++++++++---------
.../apache/inlong/sort/parser/AllMigrateTest.java | 49 +++++++++++++++++++++-
2 files changed, 76 insertions(+), 20 deletions(-)
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 64fe0dd98..af0a9c693 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -226,13 +226,36 @@ public class FlinkSqlParser implements Parser {
"relation must have at least one output node");
relation.getOutputs().forEach(s -> {
Preconditions.checkNotNull(s, "node id in outputs is null");
- Node node = nodeMap.get(s);
- Preconditions.checkNotNull(node, "can not find any node by node id " + s);
- parseNode(node, relation, nodeMap, relationMap);
+ Node outputNode = nodeMap.get(s);
+ Preconditions.checkNotNull(outputNode, "can not find any node by node id " + s);
+ parseInputNodes(relation, nodeMap, relationMap);
+ parseSingleNode(outputNode, relation, nodeMap);
+ // for Load node we need to generate insert sql
+ if (outputNode instanceof LoadNode) {
+ insertSqls.add(genLoadNodeInsertSql((LoadNode) outputNode, relation, nodeMap));
+ }
});
log.info("parse node relation success, relation:{}", relation);
}
+ /**
+ * parse the input nodes corresponding to the output node
+ * @param relation Define relations between nodes, it also shows the data flow
+ * @param nodeMap Store the mapping relation between node id and node
+ * @param relationMap Store the mapping relation between node id and relation
+ */
+ private void parseInputNodes(NodeRelation relation, Map<String, Node> nodeMap,
+ Map<String, NodeRelation> relationMap) {
+ for (String upstreamNodeId : relation.getInputs()) {
+ if (!hasParsedSet.contains(upstreamNodeId)) {
+ Node upstreamNode = nodeMap.get(upstreamNodeId);
+ Preconditions.checkNotNull(upstreamNode,
+ "can not find any node by node id " + upstreamNodeId);
+ parseSingleNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap);
+ }
+ }
+ }
+
private void registerTableSql(Node node, String sql) {
if (node instanceof ExtractNode) {
extractTableSqls.add(sql);
@@ -246,15 +269,13 @@ public class FlinkSqlParser implements Parser {
}
/**
- * Parse a node and recursively resolve its dependent nodes
+ * Parse a single node and generate the corresponding sql
*
* @param node The abstract of extract, transform, load
* @param relation Define relations between nodes, it also shows the data flow
* @param nodeMap store the mapping relation between node id and node
- * @param relationMap Store the mapping relation between node id and relation
*/
- private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeMap,
- Map<String, NodeRelation> relationMap) {
+ private void parseSingleNode(Node node, NodeRelation relation, Map<String, Node> nodeMap) {
if (hasParsedSet.contains(node.getId())) {
log.warn("the node has already been parsed, node id:{}", node.getId());
return;
@@ -267,22 +288,10 @@ public class FlinkSqlParser implements Parser {
hasParsedSet.add(node.getId());
} else {
Preconditions.checkNotNull(relation, "relation is null");
- for (String upstreamNodeId : relation.getInputs()) {
- if (!hasParsedSet.contains(upstreamNodeId)) {
- Node upstreamNode = nodeMap.get(upstreamNodeId);
- Preconditions.checkNotNull(upstreamNode,
- "can not find any node by node id " + upstreamNodeId);
- parseNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap, relationMap);
- }
- }
if (node instanceof LoadNode) {
String createSql = genCreateSql(node);
log.info("node id:{}, create table sql:\n{}", node.getId(), createSql);
registerTableSql(node, createSql);
- LoadNode loadNode = (LoadNode) node;
- String insertSql = genLoadNodeInsertSql(loadNode, relation, nodeMap);
- log.info("node id:{}, insert sql:\n{}", node.getId(), insertSql);
- insertSqls.add(insertSql);
hasParsedSet.add(node.getId());
} else if (node instanceof TransformNode) {
TransformNode transformNode = (TransformNode) node;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index a52730578..0b5dfb6f4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -66,6 +66,23 @@ public class AllMigrateTest {
ExtractMode.CDC, null, null);
}
+ private MySqlExtractNode buildAllMigrateExtractNode2() {
+
+ Map<String, String> option = new HashMap<>();
+ option.put("append-mode", "false");
+ option.put("migrate-all", "true");
+ List<String> tables = new ArrayList(10);
+ tables.add("test.*");
+ List<FieldInfo> fields = Collections.singletonList(
+ new MetaFieldInfo("data", MetaField.DATA));
+
+ return new MySqlExtractNode("2", "mysql_input", fields,
+ null, option, null,
+ tables, "localhost", "root", "inlong",
+ "test", null, null, true, null,
+ ExtractMode.CDC, null, null);
+ }
+
private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
List<FieldInfo> fields = Collections.singletonList(
new MetaFieldInfo("data", MetaField.DATA_BYTES_DEBEZIUM));
@@ -101,7 +118,7 @@ public class AllMigrateTest {
new FieldInfo("data", new StringFormatInfo())));
CsvFormat csvFormat = new CsvFormat();
csvFormat.setDisableQuoteCharacter(true);
- return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
+ return new KafkaLoadNode("3", "kafka_output", fields, relations, null, null,
"topic", "localhost:9092",
csvFormat, null,
null, null);
@@ -140,6 +157,36 @@ public class AllMigrateTest {
Assert.assertTrue(result.tryExecute());
}
+ /**
+ * Test all migrate with two input nodes and one output node (two relations)
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testAllMigrateMultiRelations() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ Node inputNode = buildAllMigrateExtractNode();
+ Node inputNode2 = buildAllMigrateExtractNode2();
+ Node outputNode = buildAllMigrateKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, inputNode2, outputNode),
+ Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode)),
+ buildNodeRelation(Collections.singletonList(inputNode2),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
/**
* Test all migrate, the full database data is represented as bytes of canal json
*