You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "gong (via GitHub)" <gi...@apache.org> on 2023/03/14 10:29:58 UTC

[GitHub] [inlong] gong commented on a diff in pull request #7590: [INLONG-7589][Sort][Feature] Sort support multi node relation with same output but different input nodes

gong commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135329061


##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
         Node inputNode = buildAllMigrateExtractNode();
         Node outputNode = buildAllMigrateKafkaNode();
         StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode),
-                Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
                         Collections.singletonList(outputNode))));
         GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
         FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
 
+    /**
+     * Test all migrate with two input nodes and one output node (two relations)
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testAllMigrateMultiRelations() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+            .newInstance()
+            .useBlinkPlanner()
+            .inStreamingMode()
+            .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildAllMigrateExtractNode();
+        Node inputNode2 = buildAllMigrateExtractNode2();
+        Node outputNode = buildAllMigrateKafkaNode();

Review Comment:
   kafka node id need change to 3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org