You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/12 11:52:55 UTC
[incubator-seatunnel] branch dev updated: [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fcde8ee9c [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
fcde8ee9c is described below
commit fcde8ee9cf31d3a6ca6f4337f51f2adb715cd7e8
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Wed Apr 12 19:52:47 2023 +0800
[Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
---
.../core/parse/MultipleTableJobConfigParser.java | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4b87cb8e..75bd54a4a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -283,18 +283,22 @@ public class MultipleTableJobConfigParser {
TableSourceFactory.class,
factoryId,
(factory) -> factory.createSource(null));
- if (fallback) {
+
+ final List<CatalogTable> catalogTables = new ArrayList<>();
+ if (!fallback) {
+ List<CatalogTable> tables =
+ CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
+ if (!tables.isEmpty()) {
+ catalogTables.addAll(tables);
+ }
+ }
+
+ if (fallback || catalogTables.isEmpty()) {
Tuple2<CatalogTable, Action> tuple =
fallbackParser.parseSource(sourceConfig, jobConfig, tableId, parallelism);
return new Tuple2<>(tableId, Collections.singletonList(tuple));
}
- final List<CatalogTable> catalogTables =
- CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
- if (catalogTables.isEmpty()) {
- throw new JobDefineCheckException(
- "The source needs catalog table, please configure `catalog` or `schema` options.");
- }
if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) {
CatalogTable shardingTable = catalogTables.get(0);
catalogTables.clear();