You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/12 05:43:54 UTC
[incubator-seatunnel] branch dev updated: [Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 34a7f3eaa [Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
34a7f3eaa is described below
commit 34a7f3eaa4a837662de2233662b967d8defb3af3
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Fri May 12 13:43:47 2023 +0800
[Hotfix][JDBC-SINK] Fix TiDBCatalog without open (#4718)
---
.../api/common/SeaTunnelAPIErrorCode.java | 1 +
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 25 ++++++++++++++--------
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 1 -
3 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
index 6adf373ed..4adb4b8d1 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelAPIErrorCode.java
@@ -28,6 +28,7 @@ public enum SeaTunnelAPIErrorCode implements SeaTunnelErrorCode {
FACTORY_INITIALIZE_FAILED("API-06", "Factory initialize failed"),
DATABASE_ALREADY_EXISTED("API-07", "Database already existed"),
TABLE_ALREADY_EXISTED("API-08", "Table already existed"),
+ HANDLE_SAVE_MODE_FAILED("API-09", "Handle save mode failed"),
;
private final String code;
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 2cce860d2..4221172b1 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.tidb.TiDBCatalogFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
@@ -57,6 +58,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
+
@AutoService(SeaTunnelSink.class)
public class JdbcSink
implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo>,
@@ -195,18 +198,22 @@ public class JdbcSink
if (StringUtils.isBlank(jdbcSinkConfig.getDatabase())) {
return;
}
- Catalog catalog =
+ try (Catalog catalog =
new TiDBCatalogFactory()
.createCatalog(
TiDBCatalogFactory.IDENTIFIER,
- ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)));
- TablePath tablePath =
- TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
- if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
- catalog.createDatabase(tablePath, true);
- }
- if (!catalog.tableExists(tablePath)) {
- catalog.createTable(tablePath, catalogTable, true);
+ ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) {
+ catalog.open();
+ TablePath tablePath =
+ TablePath.of(jdbcSinkConfig.getDatabase(), jdbcSinkConfig.getTable());
+ if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) {
+ catalog.createDatabase(tablePath, true);
+ }
+ if (!catalog.tableExists(tablePath)) {
+ catalog.createTable(tablePath, catalogTable, true);
+ }
+ } catch (Exception e) {
+ throw new JdbcConnectorException(HANDLE_SAVE_MODE_FAILED, e);
}
}
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 3ac57502d..372bfa489 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -107,7 +107,6 @@ public class JdbcSinkFactory implements TableSinkFactory {
TRANSACTION_TIMEOUT_SEC)
.conditional(IS_EXACTLY_ONCE, false, MAX_RETRIES)
.conditional(GENERATE_SINK_SQL, true, DATABASE)
- .conditional(GENERATE_SINK_SQL, true, TABLE)
.conditional(GENERATE_SINK_SQL, false, QUERY)
.build();
}