You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "EMsnap (via GitHub)" <gi...@apache.org> on 2023/03/14 07:15:02 UTC

[GitHub] [inlong] EMsnap opened a new pull request, #7590: [INLONG-7589][Sort][Feature] Sort support multi node relation with same output but different input nodes

EMsnap opened a new pull request, #7590:
URL: https://github.com/apache/inlong/pull/7590

   ### Prepare a Pull Request
   - Fixes #7589 
   
   ### Motivation
   
   
   Now it's not capable for sort to generate multi insert sqls for one output with multiple inputs which is not acceptable for user cases.
   
   For instance, user may want to get data from multiple data sources, they can only use union operation for now. But union operation in flink is too heavy.
   
   We need to support generate sqls from one stream such that:
   
   Insert into sink select * from sourceA;
   Insert into sink select * from sourceB;
   
   ### Modifications
   
   Change the parseNode to ParseSingleNode because the recuisive method will be multi branches which is not suitable for reading. And move the input nodes parsing outside of parseNode method to make it easy for reading
   
   ### Verifying this change
   
   Run AllmigrateTestMultiRelations
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7590: [INLONG-7589][Sort][Feature] Sort support multi node relation with same output but different input nodes

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135329061


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
         Node inputNode = buildAllMigrateExtractNode();
         Node outputNode = buildAllMigrateKafkaNode();
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();

Review Comment:
   kafka node id need change to 3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7590: [INLONG-7589][Sort][Feature] Sort support multi node relation with same output but different input nodes

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135357818


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
         Node inputNode = buildAllMigrateExtractNode();
         Node outputNode = buildAllMigrateKafkaNode();
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();

Review Comment:
   UT generated sql maybe error but don't throw exception. It can't check sql is right statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #7590: [INLONG-7589][Sort] Support multi node relation with same output but different input nodes

Posted by "EMsnap (via GitHub)" <gi...@apache.org>.
EMsnap commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135434828


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
         Node inputNode = buildAllMigrateExtractNode();
         Node outputNode = buildAllMigrateKafkaNode();
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();

Review Comment:
   ut has been fixed thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] gong commented on a diff in pull request #7590: [INLONG-7589][Sort][Feature] Sort support multi node relation with same output but different input nodes

Posted by "gong (via GitHub)" <gi...@apache.org>.
gong commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135359780


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
         Node inputNode = buildAllMigrateExtractNode();
         Node outputNode = buildAllMigrateKafkaNode();
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();

Review Comment:
   you can check sql 
   <img width="899" alt="image" src="https://user-images.githubusercontent.com/9883232/224979723-455cd1b7-d0cc-49b8-a5c7-f9a1169c0819.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] dockerzhang merged pull request #7590: [INLONG-7589][Sort] Support multi node relation with same output but different input nodes

Posted by "dockerzhang (via GitHub)" <gi...@apache.org>.
dockerzhang merged PR #7590:
URL: https://github.com/apache/inlong/pull/7590


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org