You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/12 11:52:55 UTC

[incubator-seatunnel] branch dev updated: [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new fcde8ee9c [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
fcde8ee9c is described below

commit fcde8ee9cf31d3a6ca6f4337f51f2adb715cd7e8
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Wed Apr 12 19:52:47 2023 +0800

    [Improve][Zeta] Fallback when the CatalogTables is empty (#4551)
---
 .../core/parse/MultipleTableJobConfigParser.java       | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index b4b87cb8e..75bd54a4a 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -283,18 +283,22 @@ public class MultipleTableJobConfigParser {
                         TableSourceFactory.class,
                         factoryId,
                         (factory) -> factory.createSource(null));
-        if (fallback) {
+
+        final List<CatalogTable> catalogTables = new ArrayList<>();
+        if (!fallback) {
+            List<CatalogTable> tables =
+                    CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
+            if (!tables.isEmpty()) {
+                catalogTables.addAll(tables);
+            }
+        }
+
+        if (fallback || catalogTables.isEmpty()) {
             Tuple2<CatalogTable, Action> tuple =
                     fallbackParser.parseSource(sourceConfig, jobConfig, tableId, parallelism);
             return new Tuple2<>(tableId, Collections.singletonList(tuple));
         }
 
-        final List<CatalogTable> catalogTables =
-                CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
-        if (catalogTables.isEmpty()) {
-            throw new JobDefineCheckException(
-                    "The source needs catalog table, please configure `catalog` or `schema` options.");
-        }
         if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) == ParsingMode.SHARDING) {
             CatalogTable shardingTable = catalogTables.get(0);
             catalogTables.clear();