You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/12 05:43:54 UTC

[incubator-seatunnel] branch dev updated: [Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 34a7f3eaa [Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
34a7f3eaa is described below

commit 34a7f3eaa4a837662de2233662b967d8defb3af3
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri May 12 13:43:47 2023 +0800

    [Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
---
 .../api/common/SeaTunnelAPIErrorCode.java          |  1 +
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 25 ++++++++++++++--------
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  1 -
 3 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index 6adf373ed..4adb4b8d1 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -28,6 +28,7 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
     FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),
     DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
     TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
+    HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
     ;
 
     private final String code;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2cce860d2..4221172b1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
@@ -57,6 +58,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+
 @AutoService(SeaTunnelSink.class)
 public class JdbcSink
         implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
@@ -195,18 +198,22 @@ public class JdbcSink
                 if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
                     return;
                 }
-                Catalog catalog =
+                try (Catalog catalog =
                         new TiDBCatalogFactory()
                                 .createCatalog(
                                         TiDBCatalogFactory.IDENTIFIER,
-                                        ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
-                TablePath tablePath =
-                        TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
-                if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
-                    catalog.createDatabase(tablePath, true);
-                }
-                if (!catalog.tableExists(tablePath)) {
-                    catalog.createTable(tablePath, catalogTable, true);
+                                        ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) {
+                    catalog.open();
+                    TablePath tablePath =
+                            TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
+                    if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
+                        catalog.createDatabase(tablePath, true);
+                    }
+                    if (!catalog.tableExists(tablePath)) {
+                        catalog.createTable(tablePath, catalogTable, true);
+                    }
+                } catch (Exception e) {
+                    throw new JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, e);
                 }
             }
         }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 3ac57502d..372bfa489 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -107,7 +107,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         TRANSACTION_TIMEOUT_SEC)
                 .conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
                 .conditional(GENERATE_SINK_SQL, true, DATABASE)
-                .conditional(GENERATE_SINK_SQL, true, TABLE)
                 .conditional(GENERATE_SINK_SQL, false, QUERY)
                 .build();
     }