You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/10 09:31:27 UTC

[incubator-inlong] branch master updated: [INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 45cc0f249 [INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)
45cc0f249 is described below

commit 45cc0f249dc27cb9202277e1d8769555bc8075c8
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Tue May 10 17:31:22 2022 +0800

    [INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)
---
 .../flink/parser/impl/FlinkSqlParser.java             | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index 5117ca9b5..9247498fe 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -54,7 +54,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * Flink sql parse handler
@@ -67,9 +66,9 @@ public class FlinkSqlParser implements Parser {
     private final TableEnvironment tableEnv;
     private final GroupInfo groupInfo;
     private final Set<String> hasParsedSet = new HashSet<>();
-    private final Map<String, String> extractTableSqls = new TreeMap<>();
-    private final Map<String, String> transformTableSqls = new TreeMap<>();
-    private final Map<String, String> loadTableSqls = new TreeMap<>();
+    private final List<String> extractTableSqls = new ArrayList<>();
+    private final List<String> transformTableSqls = new ArrayList<>();
+    private final List<String> loadTableSqls = new ArrayList<>();
     private final List<String> insertSqls = new ArrayList<>();
 
     /**
@@ -118,9 +117,9 @@ public class FlinkSqlParser implements Parser {
             parseStream(streamInfo);
         }
         log.info("parse group success, groupId:{}", groupInfo.getGroupId());
-        List<String> createTableSqls = new ArrayList<>(extractTableSqls.values());
-        createTableSqls.addAll(transformTableSqls.values());
-        createTableSqls.addAll(loadTableSqls.values());
+        List<String> createTableSqls = new ArrayList<>(extractTableSqls);
+        createTableSqls.addAll(transformTableSqls);
+        createTableSqls.addAll(loadTableSqls);
         return new FlinkSqlParseResult(tableEnv, createTableSqls, insertSqls);
     }
 
@@ -183,11 +182,11 @@ public class FlinkSqlParser implements Parser {
 
     private void registerTableSql(Node node, String sql) {
         if (node instanceof ExtractNode) {
-            extractTableSqls.put(node.getId(), sql);
+            extractTableSqls.add(sql);
         } else if (node instanceof TransformNode) {
-            transformTableSqls.put(node.getId(), sql);
+            transformTableSqls.add(sql);
         } else if (node instanceof LoadNode) {
-            loadTableSqls.put(node.getId(), sql);
+            loadTableSqls.add(sql);
         } else {
             throw new UnsupportedOperationException("Only support [ExtractNode|TransformNode|LoadNode]");
         }