You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/05/10 09:31:27 UTC
[incubator-inlong] branch master updated: [INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 45cc0f249 [INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)
45cc0f249 is described below
commit 45cc0f249dc27cb9202277e1d8769555bc8075c8
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Tue May 10 17:31:22 2022 +0800
[INLONG-3830][Sort] Fix the error that the SQL execution order is inconsistent with the generation order (#4151)
---
.../flink/parser/impl/FlinkSqlParser.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index 5117ca9b5..9247498fe 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -54,7 +54,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
/**
* Flink sql parse handler
@@ -67,9 +66,9 @@ public class FlinkSqlParser implements Parser {
private final TableEnvironment tableEnv;
private final GroupInfo groupInfo;
private final Set<String> hasParsedSet = new HashSet<>();
- private final Map<String, String> extractTableSqls = new TreeMap<>();
- private final Map<String, String> transformTableSqls = new TreeMap<>();
- private final Map<String, String> loadTableSqls = new TreeMap<>();
+ private final List<String> extractTableSqls = new ArrayList<>();
+ private final List<String> transformTableSqls = new ArrayList<>();
+ private final List<String> loadTableSqls = new ArrayList<>();
private final List<String> insertSqls = new ArrayList<>();
/**
@@ -118,9 +117,9 @@ public class FlinkSqlParser implements Parser {
parseStream(streamInfo);
}
log.info("parse group success, groupId:{}", groupInfo.getGroupId());
- List<String> createTableSqls = new ArrayList<>(extractTableSqls.values());
- createTableSqls.addAll(transformTableSqls.values());
- createTableSqls.addAll(loadTableSqls.values());
+ List<String> createTableSqls = new ArrayList<>(extractTableSqls);
+ createTableSqls.addAll(transformTableSqls);
+ createTableSqls.addAll(loadTableSqls);
return new FlinkSqlParseResult(tableEnv, createTableSqls, insertSqls);
}
@@ -183,11 +182,11 @@ public class FlinkSqlParser implements Parser {
private void registerTableSql(Node node, String sql) {
if (node instanceof ExtractNode) {
- extractTableSqls.put(node.getId(), sql);
+ extractTableSqls.add(sql);
} else if (node instanceof TransformNode) {
- transformTableSqls.put(node.getId(), sql);
+ transformTableSqls.add(sql);
} else if (node instanceof LoadNode) {
- loadTableSqls.put(node.getId(), sql);
+ loadTableSqls.add(sql);
} else {
throw new UnsupportedOperationException("Only support [ExtractNode|TransformNode|LoadNode]");
}