You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/15 06:32:51 UTC

[inlong] branch master updated: [INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f135491 [INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)
d8f135491 is described below

commit d8f135491501c0fb72892dbebc44f394d1ea1b16
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Wed Mar 15 14:32:45 2023 +0800

    [INLONG-7589][Sort] Support multi node relation with same output but different input nodes (#7590)
---
 .../inlong/sort/parser/impl/FlinkSqlParser.java    | 47 ++++++++++++---------
 .../apache/inlong/sort/parser/AllMigrateTest.java  | 49 +++++++++++++++++++++-
 2 files changed, 76 insertions(+), 20 deletions(-)

diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index 64fe0dd98..af0a9c693 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -226,13 +226,36 @@ public class FlinkSqlParser implements Parser {
                 "relation must have at least one output node");
         relation.getOutputs().forEach(s -> {
             Preconditions.checkNotNull(s, "node id in outputs is null");
-            Node node = nodeMap.get(s);
-            Preconditions.checkNotNull(node, "can not find any node by node id " + s);
-            parseNode(node, relation, nodeMap, relationMap);
+            Node outputNode = nodeMap.get(s);
+            Preconditions.checkNotNull(outputNode, "can not find any node by node id " + s);
+            parseInputNodes(relation, nodeMap, relationMap);
+            parseSingleNode(outputNode, relation, nodeMap);
+            // for Load node we need to generate insert sql
+            if (outputNode instanceof LoadNode) {
+                insertSqls.add(genLoadNodeInsertSql((LoadNode) outputNode, relation, nodeMap));
+            }
         });
         log.info("parse node relation success, relation:{}", relation);
     }
 
+    /**
+     * parse the input nodes corresponding to the output node
+     * @param relation Define relations between nodes, it also shows the data flow
+     * @param nodeMap Store the mapping relation between node id and node
+     * @param relationMap Store the mapping relation between node id and relation
+     */
+    private void parseInputNodes(NodeRelation relation, Map<String, Node> nodeMap,
+            Map<String, NodeRelation> relationMap) {
+        for (String upstreamNodeId : relation.getInputs()) {
+            if (!hasParsedSet.contains(upstreamNodeId)) {
+                Node upstreamNode = nodeMap.get(upstreamNodeId);
+                Preconditions.checkNotNull(upstreamNode,
+                        "can not find any node by node id " + upstreamNodeId);
+                parseSingleNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap);
+            }
+        }
+    }
+
     private void registerTableSql(Node node, String sql) {
         if (node instanceof ExtractNode) {
             extractTableSqls.add(sql);
@@ -246,15 +269,13 @@ public class FlinkSqlParser implements Parser {
     }
 
     /**
-     * Parse a node and recursively resolve its dependent nodes
+     * Parse a single node and generate the corresponding sql
      *
      * @param node The abstract of extract, transform, load
      * @param relation Define relations between nodes, it also shows the data flow
      * @param nodeMap store the mapping relation between node id and node
-     * @param relationMap Store the mapping relation between node id and relation
      */
-    private void parseNode(Node node, NodeRelation relation, Map<String, Node> nodeMap,
-            Map<String, NodeRelation> relationMap) {
+    private void parseSingleNode(Node node, NodeRelation relation, Map<String, Node> nodeMap) {
         if (hasParsedSet.contains(node.getId())) {
             log.warn("the node has already been parsed, node id:{}", node.getId());
             return;
@@ -267,22 +288,10 @@ public class FlinkSqlParser implements Parser {
             hasParsedSet.add(node.getId());
         } else {
             Preconditions.checkNotNull(relation, "relation is null");
-            for (String upstreamNodeId : relation.getInputs()) {
-                if (!hasParsedSet.contains(upstreamNodeId)) {
-                    Node upstreamNode = nodeMap.get(upstreamNodeId);
-                    Preconditions.checkNotNull(upstreamNode,
-                            "can not find any node by node id " + upstreamNodeId);
-                    parseNode(upstreamNode, relationMap.get(upstreamNodeId), nodeMap, relationMap);
-                }
-            }
             if (node instanceof LoadNode) {
                 String createSql = genCreateSql(node);
                 log.info("node id:{}, create table sql:\n{}", node.getId(), createSql);
                 registerTableSql(node, createSql);
-                LoadNode loadNode = (LoadNode) node;
-                String insertSql = genLoadNodeInsertSql(loadNode, relation, nodeMap);
-                log.info("node id:{}, insert sql:\n{}", node.getId(), insertSql);
-                insertSqls.add(insertSql);
                 hasParsedSet.add(node.getId());
             } else if (node instanceof TransformNode) {
                 TransformNode transformNode = (TransformNode) node;
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index a52730578..0b5dfb6f4 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -66,6 +66,23 @@ public class AllMigrateTest {
                 ExtractMode.CDC, null, null);
     }
 
+    private MySqlExtractNode buildAllMigrateExtractNode2() {
+
+        Map<String, String> option = new HashMap<>();
+        option.put("append-mode", "false");
+        option.put("migrate-all", "true");
+        List<String> tables = new ArrayList(10);
+        tables.add("test.*");
+        List<FieldInfo> fields = Collections.singletonList(
+                new MetaFieldInfo("data", MetaField.DATA));
+
+        return new MySqlExtractNode("2", "mysql_input", fields,
+                null, option, null,
+                tables, "localhost", "root", "inlong",
+                "test", null, null, true, null,
+                ExtractMode.CDC, null, null);
+    }
+
     private MySqlExtractNode buildAllMigrateExtractNodeWithBytesFormat() {
         List<FieldInfo> fields = Collections.singletonList(
                 new MetaFieldInfo("data", MetaField.DATA_BYTES_DEBEZIUM));
@@ -101,7 +118,7 @@ public class AllMigrateTest {
                         new FieldInfo("data", new StringFormatInfo())));
         CsvFormat csvFormat = new CsvFormat();
         csvFormat.setDisableQuoteCharacter(true);
-        return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
+        return new KafkaLoadNode("3", "kafka_output", fields, relations, null, null,
                 "topic", "localhost:9092",
                 csvFormat, null,
                 null, null);
@@ -140,6 +157,36 @@ public class AllMigrateTest {
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, inputNode2, outputNode),
+                Arrays.asList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode)),
+                        buildNodeRelation(Collections.singletonList(inputNode2),
+                                Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
     /**
      * Test all migrate, the full database data is represented as bytes of canal json
      *