You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/13 07:27:30 UTC
[incubator-seatunnel] branch api-draft updated: [api-draft][connector] Add SeaTunnel jdbc sink (#1946) (#2009)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 890211c2 [api-draft][connector] Add SeaTunnel jdbc sink (#1946) (#2009)
890211c2 is described below
commit 890211c238f8d64749e9cded51f770efad3774b4
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Jun 13 15:27:26 2022 +0800
[api-draft][connector] Add SeaTunnel jdbc sink (#1946) (#2009)
* Add SeaTunnel jdbc sink (#1946)
* fix code style err
* fix use deprecated method
Co-authored-by: liuli <li...@analysys.com.cn>
Co-authored-by: Hisoka <10...@qq.com>
---
.../seatunnel-connectors-seatunnel-dist/pom.xml | 5 +
.../seatunnel-connectors-seatunnel/pom.xml | 1 +
.../pom.xml | 32 +-
.../seatunnel/jdbc/internal/JdbcOutputFormat.java | 257 +++++++++++
.../jdbc/internal/connection/DataSourceUtils.java | 104 +++++
.../connection/JdbcConnectionProvider.java | 70 +++
.../connection/SimpleJdbcConnectionProvider.java | 154 +++++++
.../internal/executor/BiConsumerWithException.java | 61 +++
.../executor/JdbcBatchStatementExecutor.java | 36 ++
.../internal/executor/JdbcStatementBuilder.java | 29 ++
.../executor/SimpleBatchStatementExecutor.java | 79 ++++
.../jdbc/internal/options/JdbcConfig.java | 55 +++
.../internal/options/JdbcConnectorOptions.java | 253 +++++++++++
.../jdbc/internal/xa/GroupXaOperationResult.java | 80 ++++
.../jdbc/internal/xa/SemanticXidGenerator.java | 113 +++++
.../seatunnel/jdbc/internal/xa/XaFacade.java | 113 +++++
.../jdbc/internal/xa/XaFacadeImplAutoLoad.java | 478 +++++++++++++++++++++
.../seatunnel/jdbc/internal/xa/XaGroupOps.java | 43 ++
.../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java | 151 +++++++
.../seatunnel/jdbc/internal/xa/XidGenerator.java | 62 +++
.../seatunnel/jdbc/internal/xa/XidImpl.java | 136 ++++++
.../jdbc/sink/JdbcExactlyOnceSinkWriter.java | 189 ++++++++
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 142 ++++++
.../jdbc/sink/JdbcSinkAggregatedCommitter.java | 91 ++++
.../seatunnel/jdbc/sink/JdbcSinkCommitter.java | 70 +++
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 96 +++++
.../jdbc/state/JdbcAggregatedCommitInfo.java | 30 ++
.../seatunnel/jdbc/state/JdbcSinkState.java | 31 ++
.../connectors/seatunnel/jdbc/state/XidInfo.java | 37 ++
.../seatunnel/jdbc/utils/ExceptionUtils.java | 53 +++
.../connectors/seatunnel/jdbc/utils/JdbcUtils.java | 158 +++++++
.../seatunnel/jdbc/utils/ThrowingRunnable.java | 52 +++
32 files changed, 3251 insertions(+), 10 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index 42a03364..d0013169 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -50,6 +50,11 @@
<artifactId>seatunnel-connector-seatunnel-hive</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-seatunnel-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-connector-seatunnel-socket</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 78e60052..86d0b527 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,6 +35,7 @@
<module>seatunnel-connector-seatunnel-console</module>
<module>seatunnel-connector-seatunnel-fake</module>
<module>seatunnel-connector-seatunnel-kafka</module>
+ <module>seatunnel-connector-seatunnel-jdbc</module>
<module>seatunnel-connector-seatunnel-socket</module>
</modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
similarity index 65%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
index 78e60052..d5c9ad27 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
@@ -21,20 +21,32 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors</artifactId>
+ <artifactId>seatunnel-connectors-seatunnel</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-seatunnel</artifactId>
+ <artifactId>seatunnel-connector-seatunnel-jdbc</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
+
+ </dependencies>
- <modules>
- <module>seatunnel-connector-seatunnel-hive</module>
- <module>seatunnel-connector-seatunnel-console</module>
- <module>seatunnel-connector-seatunnel-fake</module>
- <module>seatunnel-connector-seatunnel-kafka</module>
- <module>seatunnel-connector-seatunnel-socket</module>
- </modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
new file mode 100644
index 00000000..f7738140
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * A JDBC outputFormat
+ */
+public class JdbcOutputFormat<I, E extends JdbcBatchStatementExecutor<I>>
+ implements Serializable {
+
+ protected final JdbcConnectionProvider connectionProvider;
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
+
+ private final JdbcConnectorOptions jdbcConnectorOptions;
+ private final StatementExecutorFactory<E> statementExecutorFactory;
+
+ private transient E jdbcStatementExecutor;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient volatile Exception flushException;
+
+ public JdbcOutputFormat(
+ JdbcConnectionProvider connectionProvider,
+ JdbcConnectorOptions jdbcConnectorOptions,
+ StatementExecutorFactory<E> statementExecutorFactory) {
+ this.connectionProvider = checkNotNull(connectionProvider);
+ this.jdbcConnectorOptions = checkNotNull(jdbcConnectorOptions);
+ this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ */
+
+ public void open()
+ throws IOException {
+ try {
+ connectionProvider.getOrEstablishConnection();
+ }
+ catch (Exception e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+ jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+
+ if (jdbcConnectorOptions.getBatchIntervalMs() != 0 && jdbcConnectorOptions.getBatchSize() != 1) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 1, runnable -> {
+ AtomicInteger cnt = new AtomicInteger(0);
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("jdbc-upsert-output-format" + "-" + cnt.incrementAndGet());
+ return thread;
+ });
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (JdbcOutputFormat.this) {
+ if (!closed) {
+ try {
+ flush();
+ }
+ catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ },
+ jdbcConnectorOptions.getBatchIntervalMs(),
+ jdbcConnectorOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private E createAndOpenStatementExecutor(
+ StatementExecutorFactory<E> statementExecutorFactory)
+ throws IOException {
+ E exec = statementExecutorFactory.get();
+ try {
+ exec.prepareStatements(connectionProvider.getConnection());
+ }
+ catch (SQLException e) {
+ throw new IOException("unable to open JDBC writer", e);
+ }
+ return exec;
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing records to JDBC failed.", flushException);
+ }
+ }
+
+ public final synchronized void writeRecord(I record)
+ throws IOException {
+ checkFlushException();
+ try {
+ addToBatch(record);
+ batchCount++;
+ if (jdbcConnectorOptions.getBatchSize() > 0
+ && batchCount >= jdbcConnectorOptions.getBatchSize()) {
+ flush();
+ }
+ }
+ catch (Exception e) {
+ throw new IOException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ protected void addToBatch(I record)
+ throws SQLException {
+ jdbcStatementExecutor.addToBatch(record);
+ }
+
+ public synchronized void flush()
+ throws IOException {
+ checkFlushException();
+ final int sleepMs = 1000;
+ for (int i = 0; i <= jdbcConnectorOptions.getMaxRetries(); i++) {
+ try {
+ attemptFlush();
+ batchCount = 0;
+ break;
+ }
+ catch (SQLException e) {
+ LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+ if (i >= jdbcConnectorOptions.getMaxRetries()) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ try {
+ if (!connectionProvider.isConnectionValid()) {
+ updateExecutor(true);
+ }
+ }
+ catch (Exception exception) {
+ LOG.error(
+ "JDBC connection is not valid, and reestablish connection failed.",
+ exception);
+ throw new IOException("Reestablish JDBC connection failed", exception);
+ }
+ try {
+ Thread.sleep(sleepMs * i);
+ }
+ catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "unable to flush; interrupted while doing another attempt", e);
+ }
+ }
+ }
+ }
+
+ protected void attemptFlush()
+ throws SQLException {
+ jdbcStatementExecutor.executeBatch();
+ }
+
+ /**
+ * Executes prepared statement and closes all resources of this instance.
+ */
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ if (this.scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+
+ if (batchCount > 0) {
+ try {
+ flush();
+ }
+ catch (Exception e) {
+ LOG.warn("Writing records to JDBC failed.", e);
+ throw new RuntimeException("Writing records to JDBC failed.", e);
+ }
+ }
+
+ try {
+ if (jdbcStatementExecutor != null) {
+ jdbcStatementExecutor.closeStatements();
+ }
+ }
+ catch (SQLException e) {
+ LOG.warn("Close JDBC writer failed.", e);
+ }
+ }
+ connectionProvider.closeConnection();
+ checkFlushException();
+ }
+
+ public void updateExecutor(boolean reconnect)
+ throws SQLException, ClassNotFoundException {
+ jdbcStatementExecutor.closeStatements();
+ jdbcStatementExecutor.prepareStatements(
+ reconnect ? connectionProvider.reestablishConnection() : connectionProvider.getConnection());
+ }
+
+ @VisibleForTesting
+ public Connection getConnection() {
+ return connectionProvider.getConnection();
+ }
+
+ /**
+ * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+ *
+ * @param <T> The type of instance.
+ */
+ public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
+ extends Supplier<T>, Serializable {}
+
+ ;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
new file mode 100644
index 00000000..1e2e213a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import com.google.common.base.CaseFormat;
+import lombok.NonNull;
+
+import javax.sql.CommonDataSource;
+import javax.sql.DataSource;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class DataSourceUtils
+ implements Serializable {
+ private static final String GETTER_PREFIX = "get";
+
+ private static final String SETTER_PREFIX = "set";
+
+ public static CommonDataSource buildCommonDataSource(@NonNull JdbcConnectorOptions jdbcConnectorOptions)
+ throws InvocationTargetException, IllegalAccessException {
+ CommonDataSource dataSource = (CommonDataSource) loadDataSource(jdbcConnectorOptions.getXaDataSourceClassName());
+ setProperties(dataSource, buildDatabaseAccessConfig(jdbcConnectorOptions));
+ return dataSource;
+ }
+
+ private static Map<String, Object> buildDatabaseAccessConfig(JdbcConnectorOptions jdbcConnectorOptions) {
+ HashMap<String, Object> accessConfig = new HashMap<>();
+ accessConfig.put("url", jdbcConnectorOptions.getUrl());
+ if (jdbcConnectorOptions.getUsername().isPresent()) {
+ accessConfig.put("user", jdbcConnectorOptions.getUsername().get());
+ }
+ if (jdbcConnectorOptions.getPassword().isPresent()) {
+ accessConfig.put("password", jdbcConnectorOptions.getPassword().get());
+ }
+
+ return accessConfig;
+ }
+
+ private static void setProperties(final CommonDataSource commonDataSource, final Map<String, Object> databaseAccessConfig)
+ throws InvocationTargetException, IllegalAccessException {
+ for (Map.Entry<String, Object> entry : databaseAccessConfig.entrySet()) {
+ Optional<Method> method = findSetterMethod(commonDataSource.getClass().getMethods(), entry.getKey());
+ if (method.isPresent()) {
+ method.get().invoke(commonDataSource, entry.getValue());
+ }
+ }
+ }
+
+ private static Method findGetterMethod(final DataSource dataSource, final String propertyName)
+ throws NoSuchMethodException {
+ String getterMethodName = GETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, propertyName);
+ Method result = dataSource.getClass().getMethod(getterMethodName);
+ result.setAccessible(true);
+ return result;
+ }
+
+ private static Optional<Method> findSetterMethod(final Method[] methods, final String property) {
+ String setterMethodName = SETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, property);
+ return Arrays.stream(methods)
+ .filter(each -> each.getName().equals(setterMethodName) && 1 == each.getParameterTypes().length)
+ .findFirst();
+ }
+
+ private static Object loadDataSource(final String xaDataSourceClassName) {
+ Class<?> xaDataSourceClass;
+ try {
+ xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
+ } catch (final ClassNotFoundException ignored) {
+ try {
+ xaDataSourceClass = Class.forName(xaDataSourceClassName);
+ } catch (final ClassNotFoundException ex) {
+ throw new RuntimeException("Failed to load [" + xaDataSourceClassName + "]", ex);
+ }
+ }
+ try {
+ return xaDataSourceClass.getDeclaredConstructor().newInstance();
+ } catch (final ReflectiveOperationException ex) {
+ throw new RuntimeException("Failed to instance [" + xaDataSourceClassName + "]", ex);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java
new file mode 100644
index 00000000..94c10dc2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * JDBC connection provider.
+ */
+
+public interface JdbcConnectionProvider {
+ /**
+ * Get existing connection.
+ *
+ * @return existing connection
+ */
+
+ Connection getConnection();
+
+ /**
+ * Check whether possible existing connection is valid or not through {@link
+ * Connection#isValid(int)}.
+ *
+ * @return true if existing connection is valid
+ * @throws SQLException sql exception throw from {@link Connection#isValid(int)}
+ */
+ boolean isConnectionValid()
+ throws SQLException;
+
+ /**
+ * Get existing connection or establish an new one if there is none.
+ *
+ * @return existing connection or newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection getOrEstablishConnection()
+ throws SQLException, ClassNotFoundException;
+
+ /**
+ * Close possible existing connection.
+ */
+ void closeConnection();
+
+ /**
+ * Close possible existing connection and establish an new one.
+ *
+ * @return newly established connection
+ * @throws SQLException sql exception
+ * @throws ClassNotFoundException driver class not found
+ */
+ Connection reestablishConnection()
+ throws SQLException, ClassNotFoundException;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
new file mode 100644
index 00000000..7e69e990
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/**
+ * Simple JDBC connection provider.
+ */
+public class SimpleJdbcConnectionProvider
+ implements JdbcConnectionProvider, Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private final JdbcConnectorOptions jdbcOptions;
+
+ private transient Driver loadedDriver;
+ private transient Connection connection;
+
+ static {
+ // Load DriverManager first to avoid deadlock between DriverManager's
+ // static initialization block and specific driver class's static
+ // initialization block when two different driver classes are loading
+ // concurrently using Class.forName while DriverManager is uninitialized
+ // before.
+ //
+ // This could happen in JDK 8 but not above as driver loading has been
+ // moved out of DriverManager's static initialization block since JDK 9.
+ DriverManager.getDrivers();
+ }
+
+ public SimpleJdbcConnectionProvider(@NonNull JdbcConnectorOptions jdbcOptions) {
+ this.jdbcOptions = jdbcOptions;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public boolean isConnectionValid()
+ throws SQLException {
+ return connection != null
+ && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
+ }
+
+ private static Driver loadDriver(String driverName)
+ throws SQLException, ClassNotFoundException {
+ checkNotNull(driverName);
+ Enumeration<Driver> drivers = DriverManager.getDrivers();
+ while (drivers.hasMoreElements()) {
+ Driver driver = drivers.nextElement();
+ if (driver.getClass().getName().equals(driverName)) {
+ return driver;
+ }
+ }
+
+ // We could reach here for reasons:
+ // * Class loader hell of DriverManager(see JDK-8146872).
+ // * driver is not installed as a service provider.
+ Class<?> clazz =
+ Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
+ try {
+ return (Driver) clazz.getDeclaredConstructor().newInstance();
+ } catch (Exception ex) {
+ throw new SQLException("Fail to create driver of class " + driverName, ex);
+ }
+ }
+
+ private Driver getLoadedDriver()
+ throws SQLException, ClassNotFoundException {
+ if (loadedDriver == null) {
+ loadedDriver = loadDriver(jdbcOptions.getDriverName());
+ }
+ return loadedDriver;
+ }
+
+ @Override
+ public Connection getOrEstablishConnection()
+ throws SQLException, ClassNotFoundException {
+ if (connection != null) {
+ return connection;
+ }
+ Driver driver = getLoadedDriver();
+ Properties info = new Properties();
+ if (jdbcOptions.getUsername().isPresent()) {
+ info.setProperty("user", jdbcOptions.getUsername().get());
+ }
+ if (jdbcOptions.getPassword().isPresent()) {
+ info.setProperty("password", jdbcOptions.getPassword().get());
+ }
+ connection = driver.connect(jdbcOptions.getUrl(), info);
+ if (connection == null) {
+ // Throw same exception as DriverManager.getConnection when no driver found to match
+ // caller expectation.
+ throw new SQLException(
+ "No suitable driver found for " + jdbcOptions.getUrl(), "08001");
+ }
+
+ return connection;
+ }
+
+ @Override
+ public void closeConnection() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("JDBC connection close failed.", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection()
+ throws SQLException, ClassNotFoundException {
+ closeConnection();
+ return getOrEstablishConnection();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java
new file mode 100644
index 00000000..24d9d17d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <E> type of the thrown exception
+ */
+public interface BiConsumerWithException<T, U, E extends Throwable> {
+
+ /**
+ * Performs this operation on the given arguments.
+ *
+ * @param t the first input argument
+ * @param u the second input argument
+ * @throws E in case of an error
+ */
+ void accept(T t, U u) throws E;
+
+ /**
+ * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+ *
+ * @param biConsumerWithException BiConsumer with exception to convert into a {@link
+ * BiConsumer}.
+ * @param <A> first input type
+ * @param <B> second input type
+ * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+ */
+ static <A, B> BiConsumer<A, B> unchecked(
+ BiConsumerWithException<A, B, ?> biConsumerWithException) {
+ return (A a, B b) -> {
+ try {
+ biConsumerWithException.accept(a, b);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java
new file mode 100644
index 00000000..ec0bde8d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/** Executes the given JDBC statement in batch for the accumulated records. */
+public interface JdbcBatchStatementExecutor<T> {
+
+ /** Create statements from connection. */
+ void prepareStatements(Connection connection) throws SQLException;
+
+ void addToBatch(T record) throws SQLException;
+
+ /** Submits a batch of commands to the database for execution. */
+ void executeBatch() throws SQLException;
+
+ /** Close JDBC related statements. */
+ void closeStatements() throws SQLException;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java
new file mode 100644
index 00000000..f8b778c4
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of
+ * StreamRecord.
+ */
+public interface JdbcStatementBuilder<T>
+ extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
new file mode 100644
index 00000000..bb4340b5
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that executes supplied statement for given the records
+ * (without any pre-processing).
+ */
+public class SimpleBatchStatementExecutor<T> implements JdbcBatchStatementExecutor<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class);
+
+ private final String sql;
+ private final JdbcStatementBuilder<T> parameterSetter;
+ private final List<T> batch;
+
+ private transient PreparedStatement st;
+
+ public SimpleBatchStatementExecutor(
+ String sql, JdbcStatementBuilder<T> statementBuilder) {
+ this.sql = sql;
+ this.parameterSetter = statementBuilder;
+ this.batch = new ArrayList<T>();
+ }
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ this.st = connection.prepareStatement(sql);
+ }
+
+ @Override
+ public void addToBatch(T record) {
+ batch.add(record);
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ if (!batch.isEmpty()) {
+ for (T r : batch) {
+ parameterSetter.accept(st, r);
+ st.addBatch();
+ }
+ st.executeBatch();
+ batch.clear();
+ }
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (st != null) {
+ st.close();
+ st = null;
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java
new file mode 100644
index 00000000..01348508
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import java.io.Serializable;
+
+public class JdbcConfig implements Serializable {
+
+ public static final String URL = "url";
+
+ public static final String DRIVER = "driver";
+
+ public static final String CONNECTION_CHECK_TIMEOUT_SEC = "connection_check_timeout_sec";
+
+ public static final String MAX_RETRIES = "max_retries";
+
+ public static final String USER = "user";
+
+ public static final String PASSWORD = "password";
+
+ public static final String QUERY = "query";
+
+ public static final String PARALLELISM = "parallelism";
+
+
+ public static final String BATCH_SIZE = "batch_size";
+
+ public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+
+
+ public static final String IS_EXACTLY_ONCE = "is_exactly_once";
+
+ public static final String XA_DATA_SOURCE_CLASS_NAME = "xa_data_source_class_name";
+
+
+ public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts";
+
+ public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec";
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java
new file mode 100644
index 00000000..7b1207f1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+public class JdbcConnectorOptions
+ implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final int DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC = 30;
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private static final int DEFAULT_BATCH_SIZE = 300;
+ private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+ private static final boolean DEFAULT_IS_EXACTLY_ONCE = false;
+ private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+ private static final int DEFAULT_TRANSACTION_TIMEOUT_SEC = -1;
+
+ private String url;
+ private String driverName;
+ private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private String username;
+ private String password;
+ private String query;
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+ private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE;
+ private String xaDataSourceClassName;
+
+ private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+
+ private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+ private JdbcConnectorOptions() {
+ }
+
+ public JdbcConnectorOptions(@NonNull Config config) {
+ this.url = config.getString(JdbcConfig.URL);
+ this.driverName = config.getString(JdbcConfig.DRIVER);
+ if (config.hasPath(JdbcConfig.USER)) {
+ this.username = config.getString(JdbcConfig.USER);
+ }
+ if (config.hasPath(JdbcConfig.PASSWORD)) {
+ this.password = config.getString(JdbcConfig.PASSWORD);
+ }
+ this.query = config.getString(JdbcConfig.QUERY);
+
+ if (config.hasPath(JdbcConfig.MAX_RETRIES)) {
+ this.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES);
+ }
+ if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) {
+ this.connectionCheckTimeoutSeconds = config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC);
+ }
+ if (config.hasPath(JdbcConfig.BATCH_SIZE)) {
+ this.batchSize = config.getInt(JdbcConfig.BATCH_SIZE);
+ }
+ if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) {
+ this.batchIntervalMs = config.getInt(JdbcConfig.BATCH_INTERVAL_MS);
+ }
+
+ if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) {
+ this.isExactlyOnce = true;
+ this.xaDataSourceClassName = config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME);
+ if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) {
+ this.maxCommitAttempts = config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS);
+ }
+ if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) {
+ this.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
+ }
+ }
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getDriverName() {
+ return driverName;
+ }
+
+ public int getConnectionCheckTimeoutSeconds() {
+ return connectionCheckTimeoutSeconds;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public Optional<String> getUsername() {
+ return Optional.ofNullable(username);
+ }
+
+ public Optional<String> getPassword() {
+ return Optional.ofNullable(password);
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getBatchIntervalMs() {
+ return batchIntervalMs;
+ }
+
+ public boolean isExactlyOnce() {
+ return isExactlyOnce;
+ }
+
+ public String getXaDataSourceClassName() {
+ return xaDataSourceClassName;
+ }
+
+ public int getMaxCommitAttempts() {
+ return maxCommitAttempts;
+ }
+
+ public Optional<Integer> getTransactionTimeoutSec() {
+ return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec);
+ }
+
+ public static JdbcConnectorOptionsBuilder builder() {
+ return new JdbcConnectorOptionsBuilder();
+ }
+
+ public static final class JdbcConnectorOptionsBuilder {
+ private String url;
+ private String driverName;
+ private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+ private int maxRetries = DEFAULT_MAX_RETRIES;
+ private String username;
+ private String password;
+ private String query;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+ private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE;
+ private String xaDataSourceClassName;
+ private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+ private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+ private JdbcConnectorOptionsBuilder() {
+ }
+
+ public JdbcConnectorOptionsBuilder withUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withDriverName(String driverName) {
+ this.driverName = driverName;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
+ this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withMaxRetries(int maxRetries) {
+ this.maxRetries = maxRetries;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withUsername(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withPassword(String password) {
+ this.password = password;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withQuery(String query) {
+ this.query = query;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withBatchIntervalMs(int batchIntervalMs) {
+ this.batchIntervalMs = batchIntervalMs;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withIsExactlyOnce(boolean isExactlyOnce) {
+ this.isExactlyOnce = isExactlyOnce;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withXaDataSourceClassName(String xaDataSourceClassName) {
+ this.xaDataSourceClassName = xaDataSourceClassName;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withMaxCommitAttempts(int maxCommitAttempts) {
+ this.maxCommitAttempts = maxCommitAttempts;
+ return this;
+ }
+
+ public JdbcConnectorOptionsBuilder withTransactionTimeoutSec(int transactionTimeoutSec) {
+ this.transactionTimeoutSec = transactionTimeoutSec;
+ return this;
+ }
+
+ public JdbcConnectorOptions build() {
+ JdbcConnectorOptions jdbcConnectorOptions = new JdbcConnectorOptions();
+ jdbcConnectorOptions.batchSize = this.batchSize;
+ jdbcConnectorOptions.batchIntervalMs = this.batchIntervalMs;
+ jdbcConnectorOptions.driverName = this.driverName;
+ jdbcConnectorOptions.maxRetries = this.maxRetries;
+ jdbcConnectorOptions.password = this.password;
+ jdbcConnectorOptions.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds;
+ jdbcConnectorOptions.query = this.query;
+ jdbcConnectorOptions.url = this.url;
+ jdbcConnectorOptions.username = this.username;
+ jdbcConnectorOptions.transactionTimeoutSec = this.transactionTimeoutSec;
+ jdbcConnectorOptions.maxCommitAttempts = this.maxCommitAttempts;
+ jdbcConnectorOptions.isExactlyOnce = this.isExactlyOnce;
+ jdbcConnectorOptions.xaDataSourceClassName = this.xaDataSourceClassName;
+ return jdbcConnectorOptions;
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java
new file mode 100644
index 00000000..34952249
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class GroupXaOperationResult<T> {
+ private final List<T> succeeded = new ArrayList<>();
+ private final List<T> failed = new ArrayList<>();
+ private final List<T> toRetry = new ArrayList<>();
+ private Optional<Exception> failure = Optional.empty();
+ private Optional<Exception> transientFailure = Optional.empty();
+
+ void failedTransiently(T x, XaFacade.TransientXaException e) {
+ toRetry.add(x);
+ transientFailure =
+ getTransientFailure().isPresent() ? getTransientFailure() : Optional.of(e);
+ }
+
+ void failed(T x, Exception e) {
+ failed.add(x);
+ failure = failure.isPresent() ? failure : Optional.of(e);
+ }
+
+ void succeeded(T x) {
+ succeeded.add(x);
+ }
+
+ private RuntimeException wrapFailure(
+ Exception error, String formatWithCounts, int errCount) {
+ return new RuntimeException(
+ String.format(formatWithCounts, errCount, total()), error);
+ }
+
+ private int total() {
+ return succeeded.size() + failed.size() + toRetry.size();
+ }
+
+ public List<T> getForRetry() {
+ return toRetry;
+ }
+
+ Optional<Exception> getTransientFailure() {
+ return transientFailure;
+ }
+
+ boolean hasNoFailures() {
+ return !failure.isPresent() && !transientFailure.isPresent();
+ }
+
+ void throwIfAnyFailed(String action) {
+ failure.map(
+ f ->
+ wrapFailure(
+ f,
+ "failed to " + action + " %d transactions out of %d",
+ toRetry.size() + failed.size()))
+ .ifPresent(
+ f -> {
+ throw f;
+ });
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
new file mode 100644
index 00000000..98825a57
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import javax.transaction.xa.Xid;
+
+import java.security.SecureRandom;
+import java.util.Arrays;
+
+/**
+ * Generates {@link Xid} from:
+ *
+ * <ol>
+ * <li>To provide uniqueness over other jobs and apps, and other instances
+ * <li>of this job, gtrid consists of
+ * <li>job id (16 bytes)
+ * <li>subtask index (4 bytes)
+ * <li>checkpoint id (4 bytes)
+ * <li>bqual consists of 4 random bytes (generated using {@link SecureRandom})
+ * </ol>
+ *
+ * <p>Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids will
+ * collide).
+ */
+class SemanticXidGenerator
+ implements XidGenerator {
+ private static final long serialVersionUID = 1L;
+
+ private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+
+ private static final int JOB_ID_BYTES = 32;
+ private static final int FORMAT_ID = 201;
+
+ private transient byte[] gtridBuffer;
+ private transient byte[] bqualBuffer;
+
+ @Override
+ public void open() {
+ // globalTransactionId = job id + task index + checkpoint id
+ gtridBuffer = new byte[JOB_ID_BYTES + Integer.BYTES + Long.BYTES];
+ // branchQualifier = random bytes
+ bqualBuffer = getRandomBytes(Integer.BYTES);
+ }
+
+ @Override
+ public Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId) {
+ byte[] jobIdBytes = context.getJobId().getBytes();
+ checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
+ System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
+
+ writeNumber(sinkContext.getIndexOfSubtask(), Integer.BYTES, gtridBuffer, JOB_ID_BYTES);
+ writeNumber(checkpointId, Long.BYTES, gtridBuffer, JOB_ID_BYTES + Integer.BYTES);
+ // relying on arrays copying inside XidImpl constructor
+ return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
+ }
+
+ @Override
+ public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext) {
+ if (xid.getFormatId() != FORMAT_ID) {
+ return false;
+ }
+ int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES);
+ if (subtaskIndex != sinkContext.getIndexOfSubtask()
+ && subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) {
+ return false;
+ }
+ byte[] jobIdBytes = new byte[JOB_ID_BYTES];
+ System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, JOB_ID_BYTES);
+ return Arrays.equals(jobIdBytes, context.getJobId().getBytes());
+ }
+
+ private static int readNumber(byte[] bytes, int offset, int numBytes) {
+ final int number = 0xff;
+ int result = 0;
+ for (int i = 0; i < numBytes; i++) {
+ result |= (bytes[offset + i] & number) << Byte.SIZE * i;
+ }
+ return result;
+ }
+
+ private static void writeNumber(long number, int numBytes, byte[] dst, int dstOffset) {
+ for (int i = dstOffset; i < dstOffset + numBytes; i++) {
+ dst[i] = (byte) number;
+ number >>>= Byte.SIZE;
+ }
+ }
+
+ private byte[] getRandomBytes(int size) {
+ byte[] bytes = new byte[size];
+ SECURE_RANDOM.nextBytes(bytes);
+ return bytes;
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
new file mode 100644
index 00000000..2f8b78be
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ *
+ * <p>Typical workflow:
+ *
+ * <ol>
+ * <li>{@link #open}
+ * <li>{@link #start} transaction
+ * <li>{@link #getConnection}, write some data
+ * <li>{@link #endAndPrepare} (or {@link #failAndRollback})
+ * <li>{@link #commit} / {@link #rollback}
+ * <li>{@link #close}
+ * </ol>
+ *
+ * {@link #recover} can be used to get abandoned prepared transactions for cleanup.
+ */
+
+public interface XaFacade
+ extends JdbcConnectionProvider, Serializable, AutoCloseable {
+
+ static XaFacade fromJdbcConnectionOptions(
+ JdbcConnectorOptions jdbcConnectorOptions) {
+ return new XaFacadeImplAutoLoad(jdbcConnectorOptions);
+ }
+
+ void open() throws Exception;
+
+ boolean isOpen();
+
+ /** Start a new transaction. */
+ void start(Xid xid) throws Exception;
+
+ /** End and then prepare the transaction. Transaction can't be resumed afterwards. */
+ void endAndPrepare(Xid xid) throws Exception;
+
+ /**
+ * Commit previously prepared transaction.
+ *
+ * @param ignoreUnknown whether to ignore {@link XAException#XAER_NOTA
+ * XAER_NOTA} error.
+ */
+ void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException;
+
+ /** Rollback previously prepared transaction. */
+ void rollback(Xid xid) throws TransientXaException;
+
+ /**
+ * End transaction as {@link javax.transaction.xa.XAResource#TMFAIL failed}; in case of error,
+ * try to roll it back.
+ */
+ void failAndRollback(Xid xid) throws TransientXaException;
+
+ /**
+ * Note: this can block on some non-MVCC databases if there are ended not prepared transactions.
+ */
+ Collection<Xid> recover() throws TransientXaException;
+
+ /**
+ * Thrown by {@link XaFacade} when RM responds with {@link
+ * javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} indicating that the transaction doesn't
+ * include any changes. When such a transaction is committed RM may return an error (usually,
+ * {@link XAException#XAER_NOTA XAER_NOTA}).
+ */
+ class EmptyXaTransactionException extends RuntimeException {
+ private final Xid xid;
+
+ EmptyXaTransactionException(Xid xid) {
+ super("end response XA_RDONLY, xid: " + xid);
+ this.xid = xid;
+ }
+
+ public Xid getXid() {
+ return xid;
+ }
+ }
+
+ /**
+ * Indicates a transient or unknown failure from the resource manager (see {@link
+ * XAException#XA_RBTRANSIENT XA_RBTRANSIENT}, {@link XAException#XAER_RMFAIL XAER_RMFAIL}).
+ */
+ class TransientXaException extends RuntimeException {
+ TransientXaException(XAException cause) {
+ super(cause);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
new file mode 100644
index 00000000..d8c969ce
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static javax.transaction.xa.XAException.XAER_NOTA;
+import static javax.transaction.xa.XAException.XAER_RMFAIL;
+import static javax.transaction.xa.XAException.XA_HEURCOM;
+import static javax.transaction.xa.XAException.XA_HEURHAZ;
+import static javax.transaction.xa.XAException.XA_HEURMIX;
+import static javax.transaction.xa.XAException.XA_HEURRB;
+import static javax.transaction.xa.XAException.XA_RBBASE;
+import static javax.transaction.xa.XAException.XA_RBTIMEOUT;
+import static javax.transaction.xa.XAException.XA_RBTRANSIENT;
+import static javax.transaction.xa.XAResource.TMENDRSCAN;
+import static javax.transaction.xa.XAResource.TMNOFLAGS;
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Default {@link org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade} implementation.
+ */
+public class XaFacadeImplAutoLoad
+ implements XaFacade {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImplAutoLoad.class);
+ private static final Set<Integer> TRANSIENT_ERR_CODES =
+ new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL));
+ private static final Set<Integer> HEUR_ERR_CODES =
+ new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX));
+ private static final int MAX_RECOVER_CALLS = 100;
+
+ private final JdbcConnectorOptions jdbcConnectorOptions;
+ private transient XAResource xaResource;
+ private transient Connection connection;
+ private transient XAConnection xaConnection;
+
+ XaFacadeImplAutoLoad(JdbcConnectorOptions jdbcConnectorOptions) {
+ checkState(jdbcConnectorOptions.isExactlyOnce(), "is_exactly_once config error");
+ this.jdbcConnectorOptions = jdbcConnectorOptions;
+ }
+
+ @Override
+ public void open() throws SQLException {
+ checkState(!isOpen(), "already connected");
+ XADataSource ds;
+ try {
+ ds = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectorOptions);
+ }
+ catch (Exception e) {
+ throw new SQLException(e);
+ }
+ xaConnection = ds.getXAConnection();
+ xaResource = xaConnection.getXAResource();
+ if (jdbcConnectorOptions.getTransactionTimeoutSec().isPresent()) {
+ try {
+ xaResource.setTransactionTimeout(jdbcConnectorOptions.getTransactionTimeoutSec().get());
+ }
+ catch (XAException e) {
+ throw new SQLException(e);
+ }
+ }
+ connection = xaConnection.getConnection();
+ connection.setReadOnly(false);
+ connection.setAutoCommit(false);
+ checkState(!connection.getAutoCommit());
+ }
+
+ @Override
+ public void close() throws SQLException {
+ if (connection != null) {
+ connection.close(); // close connection - likely a wrapper
+ connection = null;
+ }
+ try {
+ xaConnection.close(); // close likely a pooled AND the underlying connection
+ }
+ catch (SQLException e) {
+ // Some databases (e.g. MySQL) rollback changes on normal client disconnect which
+ // causes an exception if an XA transaction was prepared. Note that resources are
+ // still released in case of an error. Pinning MySQL connections doesn't help as
+ // SuspendableXAConnection has the same close() logic.
+ // Other DBs don't rollback, e.g. for PgSql the previous connection.close() call
+ // disassociates the connection (and that call works because it has a check for XA)
+ // and rollback() is not called.
+ // In either case, not closing the XA connection here leads to the resource leak.
+ LOG.warn("unable to close XA connection", e);
+ }
+ xaResource = null;
+ }
+
+ @Override
+ public Connection getConnection() {
+ checkNotNull(connection);
+ return connection;
+ }
+
+ @Override
+ public boolean isConnectionValid() throws SQLException {
+ return isOpen() && connection.isValid(connection.getNetworkTimeout());
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException {
+ if (!isOpen()) {
+ open();
+ }
+ return connection;
+ }
+
+ @Override
+ public void closeConnection() {
+ try {
+ close();
+ }
+ catch (SQLException e) {
+ LOG.warn("Connection close failed.", e);
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void start(Xid xid) {
+ execute(Command.fromRunnable("start", xid, () -> xaResource.start(xid, TMNOFLAGS)));
+ }
+
+ @Override
+ public void endAndPrepare(Xid xid) {
+ execute(Command.fromRunnable("end", xid, () -> xaResource.end(xid, XAResource.TMSUCCESS)));
+ int prepResult = execute(new Command<>("prepare", of(xid), () -> xaResource.prepare(xid)));
+ if (prepResult == XAResource.XA_RDONLY) {
+ throw new EmptyXaTransactionException(xid);
+ }
+ else if (prepResult != XAResource.XA_OK) {
+ throw new RuntimeException(
+ formatErrorMessage("prepare", of(xid), empty(), "response: " + prepResult));
+ }
+ }
+
+ @Override
+ public void failAndRollback(Xid xid) {
+ execute(
+ Command.fromRunnable(
+ "end (fail)",
+ xid,
+ () -> {
+ xaResource.end(xid, XAResource.TMFAIL);
+ xaResource.rollback(xid);
+ },
+ err -> {
+ if (err.errorCode >= XA_RBBASE) {
+ rollback(xid);
+ }
+ else {
+ LOG.warn(
+ formatErrorMessage(
+ "end (fail)", of(xid), of(err.errorCode)));
+ }
+ }));
+ }
+
+ @Override
+ public void commit(Xid xid, boolean ignoreUnknown) {
+ execute(
+ Command.fromRunnableRecoverByWarn(
+ "commit",
+ xid,
+ () ->
+ xaResource.commit(
+ xid,
+ false /* not onePhase because the transaction should be prepared already */),
+ e -> buildCommitErrorDesc(e, ignoreUnknown)));
+ }
+
+ @Override
+ public void rollback(Xid xid) {
+ execute(
+ Command.fromRunnableRecoverByWarn(
+ "rollback",
+ xid,
+ () -> xaResource.rollback(xid),
+ this::buildRollbackErrorDesc));
+ }
+
+ private void forget(Xid xid) {
+ execute(
+ Command.fromRunnableRecoverByWarn(
+ "forget",
+ xid,
+ () -> xaResource.forget(xid),
+ e -> of("manual cleanup may be required")));
+ }
+
+ @Override
+ public Collection<Xid> recover() {
+ return execute(
+ new Command<>(
+ "recover",
+ empty(),
+ () -> {
+ List<Xid> list = recover(TMSTARTRSCAN);
+ try {
+ for (int i = 0; list.addAll(recover(TMNOFLAGS)); i++) {
+ // H2 sometimes returns same tx list here - should probably use
+ // recover(TMSTARTRSCAN | TMENDRSCAN)
+ checkState(
+ i < MAX_RECOVER_CALLS, "too many xa_recover() calls");
+ }
+ }
+ finally {
+ recover(TMENDRSCAN);
+ }
+ return list;
+ }));
+ }
+
+ @Override
+ public boolean isOpen() {
+ return xaResource != null;
+ }
+
+ private List<Xid> recover(int flags) throws XAException {
+ return Arrays.asList(xaResource.recover(flags));
+ }
+
+ private <T> T execute(Command<T> cmd) throws RuntimeException {
+ checkState(isOpen(), "not connected");
+ LOG.debug("{}, xid={}", cmd.name, cmd.xid);
+ try {
+ T result = cmd.callable.call();
+ LOG.trace("{} succeeded , xid={}", cmd.name, cmd.xid);
+ return result;
+ }
+ catch (XAException e) {
+ if (HEUR_ERR_CODES.contains(e.errorCode)) {
+ cmd.xid.ifPresent(this::forget);
+ }
+ return cmd.recover.apply(e).orElseThrow(() -> wrapException(cmd.name, cmd.xid, e));
+ }
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw wrapException(cmd.name, cmd.xid, e);
+ }
+ }
+
+ private static RuntimeException wrapException(
+ String action, Optional<Xid> xid, Exception ex) {
+ if (ex instanceof XAException) {
+ XAException xa = (XAException) ex;
+ if (TRANSIENT_ERR_CODES.contains(xa.errorCode)) {
+ throw new TransientXaException(xa);
+ }
+ else {
+ throw new RuntimeException(
+ formatErrorMessage(action, xid, of(xa.errorCode), xa.getMessage()));
+ }
+ }
+ else {
+ throw new RuntimeException(
+ formatErrorMessage(action, xid, empty(), ex.getMessage()), ex);
+ }
+ }
+
+ private Optional<String> buildCommitErrorDesc(XAException err, boolean ignoreUnknown) {
+ if (err.errorCode == XA_HEURCOM) {
+ return Optional.of("transaction was heuristically committed earlier");
+ }
+ else if (ignoreUnknown && err.errorCode == XAER_NOTA) {
+ return Optional.of("transaction is unknown to RM (ignoring)");
+ }
+ else {
+ return empty();
+ }
+ }
+
+ private Optional<String> buildRollbackErrorDesc(XAException err) {
+ if (err.errorCode == XA_HEURRB) {
+ return Optional.of("transaction was already heuristically rolled back");
+ }
+ else if (err.errorCode >= XA_RBBASE) {
+ return Optional.of("transaction was already marked for rollback");
+ }
+ else {
+ return empty();
+ }
+ }
+
+ private static String formatErrorMessage(
+ String action, Optional<Xid> xid, Optional<Integer> errorCode, String... more) {
+ return String.format(
+ "unable to %s%s%s%s",
+ action,
+ xid.map(x -> " XA transaction, xid: " + x).orElse(""),
+ errorCode
+ .map(code -> String.format(", error %d: %s", code, descError(code)))
+ .orElse(""),
+ more == null || more.length == 0 ? "" : ". " + Arrays.toString(more));
+ }
+
+ /**
+ * @return error description from {@link XAException} javadoc from to ease debug.
+ */
+ private static String descError(int code) {
+ switch (code) {
+ case XA_HEURCOM:
+ return "heuristic commit decision was made";
+ case XAException.XA_HEURHAZ:
+ return "heuristic decision may have been made";
+ case XAException.XA_HEURMIX:
+ return "heuristic mixed decision was made";
+ case XA_HEURRB:
+ return "heuristic rollback decision was made";
+ case XAException.XA_NOMIGRATE:
+ return "the transaction resumption must happen where the suspension occurred";
+ case XAException.XA_RBCOMMFAIL:
+ return "rollback happened due to a communications failure";
+ case XAException.XA_RBDEADLOCK:
+ return "rollback happened because deadlock was detected";
+ case XAException.XA_RBINTEGRITY:
+ return "rollback happened because an internal integrity check failed";
+ case XAException.XA_RBOTHER:
+ return "rollback happened for some reason not fitting any of the other rollback error codes";
+ case XAException.XA_RBPROTO:
+ return "rollback happened due to a protocol error in the resource manager";
+ case XAException.XA_RBROLLBACK:
+ return "rollback happened for an unspecified reason";
+ case XA_RBTIMEOUT:
+ return "rollback happened because of a timeout";
+ case XA_RBTRANSIENT:
+ return "rollback happened due to a transient failure";
+ case XAException.XA_RDONLY:
+ return "the transaction branch was read-only, and has already been committed";
+ case XAException.XA_RETRY:
+ return "the method invoked returned without having any effect, and that it may be invoked again";
+ case XAException.XAER_ASYNC:
+ return "an asynchronous operation is outstanding";
+ case XAException.XAER_DUPID:
+ return "Xid given as an argument is already known to the resource manager";
+ case XAException.XAER_INVAL:
+ return "invalid arguments were passed";
+ case XAER_NOTA:
+ return "Xid is not valid";
+ case XAException.XAER_OUTSIDE:
+ return "the resource manager is doing work outside the global transaction";
+ case XAException.XAER_PROTO:
+ return "protocol error";
+ case XAException.XAER_RMERR:
+ return "resource manager error has occurred";
+ case XAER_RMFAIL:
+ return "the resource manager has failed and is not available";
+ default:
+ return "";
+ }
+ }
+
+ private static class Command<T> {
+ private final String name;
+ private final Optional<Xid> xid;
+ private final Callable<T> callable;
+ private final Function<XAException, Optional<T>> recover;
+
+ static Command<Object> fromRunnable(
+ String action, Xid xid, ThrowingRunnable<XAException> runnable) {
+ return fromRunnable(
+ action,
+ xid,
+ runnable,
+ e -> {
+ throw wrapException(action, of(xid), e);
+ });
+ }
+
+ static Command<Object> fromRunnableRecoverByWarn(
+ String action,
+ Xid xid,
+ ThrowingRunnable<XAException> runnable,
+ Function<XAException, Optional<String>> err2msg) {
+ return fromRunnable(
+ action,
+ xid,
+ runnable,
+ e ->
+ LOG.warn(
+ formatErrorMessage(
+ action,
+ of(xid),
+ of(e.errorCode),
+ err2msg.apply(e)
+ .orElseThrow(
+ () ->
+ wrapException(
+ action, of(xid), e)))));
+ }
+
+ private static Command<Object> fromRunnable(
+ String action,
+ Xid xid,
+ ThrowingRunnable<XAException> runnable,
+ Consumer<XAException> recover) {
+ return new Command<>(
+ action,
+ of(xid),
+ () -> {
+ runnable.run();
+ return null;
+ },
+ e -> {
+ recover.accept(e);
+ return Optional.of("");
+ });
+ }
+
+ private Command(String name, Optional<Xid> xid, Callable<T> callable) {
+ this(name, xid, callable, e -> empty());
+ }
+
+ private Command(
+ String name,
+ Optional<Xid> xid,
+ Callable<T> callable,
+ Function<XAException, Optional<T>> recover) {
+ this.name = name;
+ this.xid = xid;
+ this.callable = callable;
+ this.recover = recover;
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
new file mode 100644
index 00000000..e37e6b05
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+public interface XaGroupOps
+ extends Serializable {
+
+ // Commit a batch of transactions
+ public GroupXaOperationResult<XidInfo> commit(
+ List<XidInfo> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts);
+
+ void rollback(List<XidInfo> xids);
+
+ GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
+
+ void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
new file mode 100644
index 00000000..05ecce16
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class XaGroupOpsImpl
+ implements XaGroupOps {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(XaGroupOpsImpl.class);
+
+ private final XaFacade xaFacade;
+
+ public XaGroupOpsImpl(XaFacade xaFacade) {
+ this.xaFacade = xaFacade;
+ }
+
+ @Override
+ public GroupXaOperationResult<XidInfo> commit(
+ List<XidInfo> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
+ GroupXaOperationResult<XidInfo> result = new GroupXaOperationResult<>();
+ int origSize = xids.size();
+ LOG.debug("commit {} transactions", origSize);
+ for (Iterator<XidInfo> i = xids.iterator();
+ i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
+ XidInfo x = i.next();
+ i.remove();
+ try {
+ xaFacade.commit(x.getXid(), false);
+ result.succeeded(x);
+ } catch (XaFacade.TransientXaException e) {
+ result.failedTransiently(x.withAttemptsIncremented(), e);
+ } catch (Exception e) {
+ result.failed(x, e);
+ }
+ }
+ result.getForRetry().addAll(xids);
+ result.throwIfAnyFailed("commit");
+ throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
+ result.getTransientFailure()
+ .ifPresent(
+ f ->
+ LOG.warn(
+ "failed to commit {} transactions out of {} (keep them to retry later)",
+ result.getForRetry().size(),
+ origSize,
+ f));
+ return result;
+ }
+
+ @Override
+ public void rollback(List<XidInfo> xids) {
+ for (XidInfo x : xids) {
+ xaFacade.rollback(x.getXid());
+ }
+ }
+
+ @Override
+ public GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids) {
+ GroupXaOperationResult<XidInfo> result = new GroupXaOperationResult<>();
+ if (xids.isEmpty()) {
+ return result;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("rolling back {} transactions: {}", xids.size(), xids);
+ }
+ for (XidInfo x : xids) {
+ try {
+ xaFacade.failAndRollback(x.getXid());
+ result.succeeded(x);
+ } catch (XaFacade.TransientXaException e) {
+ LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+ result.failedTransiently(x, e);
+ } catch (Exception e) {
+ LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+ result.failed(x, e);
+ }
+ }
+ if (!result.getForRetry().isEmpty()) {
+ LOG.info("failed to roll back {} transactions", result.getForRetry().size());
+ }
+ return result;
+ }
+
+ @Override
+ public void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+ Collection<Xid> recovered = xaFacade.recover();
+ recovered.remove(excludeXid);
+ if (recovered.isEmpty()) {
+ return;
+ }
+ LOG.warn("rollback {} recovered transactions", recovered.size());
+ for (Xid xid : recovered) {
+ if (xidGenerator.belongsToSubtask(xid, context, sinkContext)) {
+ try {
+ xaFacade.rollback(xid);
+ } catch (Exception e) {
+ LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
+ }
+ }
+ }
+ }
+
+ private static void throwIfAnyReachedMaxAttempts(
+ GroupXaOperationResult<XidInfo> result, int maxAttempts) {
+ List<XidInfo> reached = null;
+ for (XidInfo x : result.getForRetry()) {
+ if (x.getAttempts() >= maxAttempts) {
+ if (reached == null) {
+ reached = new ArrayList<>();
+ }
+ reached.add(x);
+ }
+ }
+ if (reached != null) {
+ throw new RuntimeException(
+ String.format(
+ "reached max number of commit attempts (%d) for transactions: %s",
+ maxAttempts, reached));
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
new file mode 100644
index 00000000..a8017505
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.security.SecureRandom;
+
+/**
+ * {@link Xid} generator.
+ */
+public interface XidGenerator
+ extends Serializable, AutoCloseable {
+
+ Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId);
+
+ default void open() {}
+
+ /**
+ * @return true if the provided transaction belongs to this subtask
+ */
+ boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext);
+
+ @Override
+ default void close() {}
+
+ /**
+ * Creates a {@link XidGenerator} that generates {@link Xid xids} from:
+ *
+ * <ol>
+ * <li>job id
+ * <li>subtask index
+ * <li>checkpoint id
+ * <li>four random bytes generated using {@link SecureRandom})
+ * </ol>
+ *
+ * <p>Each created {@link XidGenerator} instance MUST be used for only one Sink instance
+ * (otherwise Xids could collide).
+ */
+ static XidGenerator semanticXidGenerator() {
+ return new SemanticXidGenerator();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java
new file mode 100644
index 00000000..f5b1f057
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte
+ * arrays.
+ */
+final class XidImpl implements Xid, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final char[] HEX_CHARS = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+ };
+
+ private final int formatId;
+ private final byte[] globalTransactionId;
+ private final byte[] branchQualifier;
+
+ public XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) {
+ checkArgument(globalTransactionId.length <= Xid.MAXGTRIDSIZE);
+ checkArgument(branchQualifier.length <= Xid.MAXBQUALSIZE);
+ this.formatId = formatId;
+ this.globalTransactionId = Arrays.copyOf(globalTransactionId, globalTransactionId.length);
+ this.branchQualifier = Arrays.copyOf(branchQualifier, branchQualifier.length);
+ }
+
+ @Override
+ public int getFormatId() {
+ return formatId;
+ }
+
+ @Override
+ public byte[] getGlobalTransactionId() {
+ return globalTransactionId;
+ }
+
+ @Override
+ public byte[] getBranchQualifier() {
+ return branchQualifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof XidImpl)) {
+ return false;
+ }
+ XidImpl xid = (XidImpl) o;
+ return formatId == xid.formatId
+ && Arrays.equals(globalTransactionId, xid.globalTransactionId)
+ && Arrays.equals(branchQualifier, xid.branchQualifier);
+ }
+
+ @Override
+ public int hashCode() {
+ final int number = 31;
+ int result = Objects.hash(formatId);
+ result = number * result + Arrays.hashCode(globalTransactionId);
+ result = number * result + Arrays.hashCode(branchQualifier);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return formatId
+ + ":"
+ + byteToHexString(globalTransactionId)
+ + ":"
+ + byteToHexString(branchQualifier);
+ }
+
+ /**
+ * Given an array of bytes it will convert the bytes to a hex string representation of the
+ * bytes.
+ *
+ * @param bytes the bytes to convert in a hex string
+ * @param start start index, inclusively
+ * @param end end index, exclusively
+ * @return hex string representation of the byte array
+ */
+ public static String byteToHexString(final byte[] bytes, final int start, final int end) {
+ final int number0xf0 = 0xF0;
+ final int number0x0f = 0x0F;
+ final int number4 = 4;
+ if (bytes == null) {
+ throw new IllegalArgumentException("bytes == null");
+ }
+
+ int length = end - start;
+ char[] out = new char[length * 2];
+
+ for (int i = start, j = 0; i < end; i++) {
+ out[j++] = HEX_CHARS[(number0xf0 & bytes[i]) >>> number4];
+ out[j++] = HEX_CHARS[number0x0f & bytes[i]];
+ }
+
+ return new String(out);
+ }
+
+ /**
+ * Given an array of bytes it will convert the bytes to a hex string representation of the
+ * bytes.
+ *
+ * @param bytes the bytes to convert in a hex string
+ * @return hex string representation of the byte array
+ */
+ public static String byteToHexString(final byte[] bytes) {
+ return byteToHexString(bytes, 0, bytes.length);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
new file mode 100644
index 00000000..849ca50c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcExactlyOnceSinkWriter
+ implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class);
+
+ private final SinkWriter.Context sinkcontext;
+
+ private final SeaTunnelContext context;
+
+ private final List<JdbcSinkState> recoverStates;
+
+ private final XaFacade xaFacade;
+
+ private final XaGroupOps xaGroupOps;
+
+ private final XidGenerator xidGenerator;
+
+ private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+
+ private transient boolean isOpen;
+
+ private transient Xid currentXid;
+ private transient Xid prepareXid;
+
+ public JdbcExactlyOnceSinkWriter(
+ SinkWriter.Context sinkcontext,
+ SeaTunnelContext context,
+ JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
+ JdbcConnectorOptions jdbcConnectorOptions,
+ List<JdbcSinkState> states) {
+ checkArgument(
+ jdbcConnectorOptions.getMaxRetries() == 0,
+ "JDBC XA sink requires maxRetries equal to 0, otherwise it could "
+ + "cause duplicates.");
+
+ this.context = context;
+ this.sinkcontext = sinkcontext;
+ this.recoverStates = states;
+ this.xidGenerator = XidGenerator.semanticXidGenerator();
+ this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+ jdbcConnectorOptions);
+
+ this.outputFormat = new JdbcOutputFormat<>(
+ xaFacade,
+ jdbcConnectorOptions,
+ () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder));
+
+ this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+ }
+
+ private void tryOpen() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ try {
+ xidGenerator.open();
+ xaFacade.open();
+ outputFormat.open();
+ if (!recoverStates.isEmpty()) {
+ Xid xid = recoverStates.get(0).getXid();
+ // Rollback pending transactions that should not include recoverStates
+ xaGroupOps.recoverAndRollback(context, sinkcontext, xidGenerator, xid);
+ }
+ beginTx();
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+ }
+
+ @Override
+ public List<JdbcSinkState> snapshotState(long checkpointId) {
+ checkState(prepareXid != null, "prepare xid must not be null");
+ return Collections.singletonList(new JdbcSinkState(prepareXid));
+ }
+
+ @Override
+ public void write(SeaTunnelRow element)
+ throws IOException {
+ tryOpen();
+ checkState(currentXid != null, "current xid must not be null");
+ SeaTunnelRow copy = SerializationUtils.clone(element);
+ outputFormat.writeRecord(copy);
+ }
+
+ @Override
+ public Optional<XidInfo> prepareCommit()
+ throws IOException {
+ prepareCurrentTx();
+ this.currentXid = null;
+ beginTx();
+ checkState(prepareXid != null, "prepare xid must not be null");
+ return Optional.of(new XidInfo(prepareXid, 0));
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ if (currentXid != null && xaFacade.isOpen()) {
+ try {
+ LOG.debug("remove current transaction before closing, xid={}", currentXid);
+ xaFacade.failAndRollback(currentXid);
+ } catch (Exception e) {
+ LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e);
+ }
+ }
+ try {
+ xaFacade.close();
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ xidGenerator.close();
+ currentXid = null;
+ prepareXid = null;
+ }
+
+ private void beginTx() throws IOException {
+ checkState(currentXid == null, "currentXid not null");
+ currentXid = xidGenerator.generateXid(context, sinkcontext, System.currentTimeMillis());
+ try {
+ xaFacade.start(currentXid);
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+
+ private void prepareCurrentTx() throws IOException {
+ checkState(currentXid != null, "no current xid");
+ outputFormat.flush();
+ try {
+ xaFacade.endAndPrepare(currentXid);
+ prepareXid = currentXid;
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
new file mode 100644
index 00000000..bd37cde4
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@AutoService(SeaTunnelSink.class)
+public class JdbcSink
+ implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo> {
+
+ private Config pluginConfig;
+
+ private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+ private SeaTunnelContext seaTunnelContext;
+
+ private JdbcConnectorOptions jdbcConnectorOptions;
+
+ @Override
+ public String getPluginName() {
+ return "jdbc";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig)
+ throws PrepareFailException {
+ this.pluginConfig = pluginConfig;
+ this.jdbcConnectorOptions = new JdbcConnectorOptions(this.pluginConfig);
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(SinkWriter.Context context)
+ throws IOException {
+ SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> sinkWriter;
+ // TODO SeatunnelTyoeInfo is not good enough to get typesArray
+ JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row);
+ if (jdbcConnectorOptions.isExactlyOnce()) {
+ sinkWriter = new JdbcExactlyOnceSinkWriter(
+ context,
+ seaTunnelContext,
+ statementBuilder,
+ jdbcConnectorOptions,
+ new ArrayList<>()
+ );
+ } else {
+ sinkWriter = new JdbcSinkWriter(
+ context,
+ statementBuilder,
+ jdbcConnectorOptions);
+ }
+
+ return sinkWriter;
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(SinkWriter.Context context, List<JdbcSinkState> states)
+ throws IOException {
+ if (jdbcConnectorOptions.isExactlyOnce()) {
+ JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row);
+ return new JdbcExactlyOnceSinkWriter(
+ context,
+ seaTunnelContext,
+ statementBuilder,
+ jdbcConnectorOptions,
+ states
+ );
+ }
+ return SeaTunnelSink.super.restoreWriter(context, states);
+ }
+
+ @Override
+ public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>> createAggregatedCommitter()
+ throws IOException {
+ if (jdbcConnectorOptions.isExactlyOnce()) {
+ return Optional.of(new JdbcSinkAggregatedCommitter(jdbcConnectorOptions));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+ this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+ }
+
+ @Override
+ public SeaTunnelContext getSeaTunnelContext() {
+ return seaTunnelContext;
+ }
+
+ @Override
+ public Optional<Serializer<JdbcAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
+ return Optional.of(new DefaultSerializer<>());
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
new file mode 100644
index 00000000..e5c9f308
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JdbcSinkAggregatedCommitter
+ implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
+
+ private final XaFacade xaFacade;
+ private final XaGroupOps xaGroupOps;
+ private final JdbcConnectorOptions jdbcConnectorOptions;
+
+ public JdbcSinkAggregatedCommitter(
+ JdbcConnectorOptions jdbcConnectorOptions
+ ) {
+ this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+ jdbcConnectorOptions);
+ this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+ this.jdbcConnectorOptions = jdbcConnectorOptions;
+ }
+
+ private void tryOpen() throws IOException {
+ if (!xaFacade.isOpen()) {
+ try {
+ xaFacade.open();
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+ }
+
+ @Override
+ public List<JdbcAggregatedCommitInfo> commit(List<JdbcAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
+ tryOpen();
+ return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> {
+ GroupXaOperationResult<XidInfo> result = xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false, jdbcConnectorOptions.getMaxCommitAttempts());
+ return new JdbcAggregatedCommitInfo(result.getForRetry());
+ }).filter(ainfo -> !ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList());
+ }
+
+ @Override
+ public JdbcAggregatedCommitInfo combine(List<XidInfo> commitInfos) {
+ return new JdbcAggregatedCommitInfo(commitInfos);
+ }
+
+ @Override
+ public void abort(List<JdbcAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
+ tryOpen();
+ for (JdbcAggregatedCommitInfo commitInfos : aggregatedCommitInfo) {
+ xaGroupOps.rollback(commitInfos.getXidInfoList());
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ try {
+ xaFacade.close();
+ } catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
new file mode 100644
index 00000000..626f379a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+public class JdbcSinkCommitter
+ implements SinkCommitter<XidInfo> {
+ private final XaFacade xaFacade;
+ private final XaGroupOps xaGroupOps;
+ private final JdbcConnectorOptions jdbcConnectorOptions;
+
+ public JdbcSinkCommitter(
+ JdbcConnectorOptions jdbcConnectorOptions
+ )
+ throws IOException {
+ this.jdbcConnectorOptions = jdbcConnectorOptions;
+ this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+ jdbcConnectorOptions);
+ this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+ try {
+ xaFacade.open();
+ }
+ catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+
+ @Override
+ public List<XidInfo> commit(List<XidInfo> committables) {
+ return xaGroupOps
+ .commit(committables, false, jdbcConnectorOptions.getMaxCommitAttempts())
+ .getForRetry();
+ }
+
+ @Override
+ public void abort(List<XidInfo> commitInfos)
+ throws IOException {
+ try {
+ xaGroupOps.rollback(commitInfos);
+ }
+ catch (Exception e) {
+ ExceptionUtils.rethrowIOException(e);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
new file mode 100644
index 00000000..548f1073
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
+
+ private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+ private final SinkWriter.Context context;
+ private transient boolean isOpen;
+
+ public JdbcSinkWriter(
+ SinkWriter.Context context,
+ JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
+ JdbcConnectorOptions jdbcConnectorOptions) {
+
+ JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectorOptions);
+
+ this.context = context;
+ this.outputFormat = new JdbcOutputFormat<>(
+ connectionProvider,
+ jdbcConnectorOptions,
+ () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder));
+ }
+
+ private void tryOpen() throws IOException {
+ if (!isOpen) {
+ isOpen = true;
+ outputFormat.open();
+ }
+ }
+
+ @Override
+ public List<JdbcSinkState> snapshotState(long checkpointId) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void write(SeaTunnelRow element)
+ throws IOException {
+ tryOpen();
+ SeaTunnelRow copy = SerializationUtils.clone(element);
+ outputFormat.writeRecord(copy);
+ }
+
+ @Override
+ public Optional<XidInfo> prepareCommit()
+ throws IOException {
+ outputFormat.flush();
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ outputFormat.close();
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java
new file mode 100644
index 00000000..e04a06dc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class JdbcAggregatedCommitInfo implements Serializable {
+ private final List<XidInfo> xidInfoList;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java
new file mode 100644
index 00000000..d3261e8c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class JdbcSinkState implements Serializable {
+ private final Xid xid;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java
new file mode 100644
index 00000000..80cbe63f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class XidInfo implements Serializable {
+
+ final Xid xid;
+ final int attempts;
+
+ public XidInfo withAttemptsIncremented() {
+ return new XidInfo(xid, attempts + 1);
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java
new file mode 100644
index 00000000..f1d66d82
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import java.io.IOException;
+
+public class ExceptionUtils {
+ public static void rethrow(Throwable t) {
+ if (t instanceof Error) {
+ throw (Error) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
+
+ /**
+ * Re-throws the given {@code Throwable} in scenarios where the signatures allows only
+ * IOExceptions (and RuntimeException and Error).
+ *
+ * <p>Throws this exception directly, if it is an IOException, a RuntimeException, or an Error.
+ * Otherwise it wraps it in an IOException and throws it.
+ *
+ * @param t The Throwable to be thrown.
+ */
+ public static void rethrowIOException(Throwable t) throws IOException {
+ if (t instanceof IOException) {
+ throw (IOException) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ } else {
+ throw new IOException(t.getMessage(), t);
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
new file mode 100644
index 00000000..b3b32f5c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/** Utils for jdbc connectors. */
+public class JdbcUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+
+ /**
+ * Adds a record to the prepared statement.
+ *
+ * <p>When this method is called, the output format is guaranteed to be opened.
+ *
+ * <p>WARNING: this may fail when no column types specified (because a best effort approach is
+ * attempted in order to insert a null value but it's not guaranteed that the JDBC driver
+ * handles PreparedStatement.setObject(pos, null))
+ *
+ * @param upload The prepared statement.
+ * @param typesArray The jdbc types of the row.
+ * @param row The records to add to the output.
+ * @see PreparedStatement
+ */
+ public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, SeaTunnelRow row)
+ throws SQLException {
+ if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getFields().length) {
+ LOG.warn(
+ "Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+ }
+ if (typesArray == null) {
+ // no types provided
+ for (int index = 0; index < row.getFields().length; index++) {
+ upload.setObject(index + 1, row.getFields()[index]);
+ }
+ } else {
+ // types provided
+ for (int i = 0; i < row.getFields().length; i++) {
+ setField(upload, typesArray[i], row.getFields()[i], i);
+ }
+ }
+ }
+
+ public static void setField(PreparedStatement upload, int type, Object field, int index)
+ throws SQLException {
+ if (field == null) {
+ upload.setNull(index + 1, type);
+ } else {
+ try {
+ // casting values as suggested by
+ // http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+ switch (type) {
+ case java.sql.Types.NULL:
+ upload.setNull(index + 1, type);
+ break;
+ case java.sql.Types.BOOLEAN:
+ case java.sql.Types.BIT:
+ upload.setBoolean(index + 1, (boolean) field);
+ break;
+ case java.sql.Types.CHAR:
+ case java.sql.Types.NCHAR:
+ case java.sql.Types.VARCHAR:
+ case java.sql.Types.LONGVARCHAR:
+ case java.sql.Types.LONGNVARCHAR:
+ upload.setString(index + 1, (String) field);
+ break;
+ case java.sql.Types.TINYINT:
+ upload.setByte(index + 1, (byte) field);
+ break;
+ case java.sql.Types.SMALLINT:
+ upload.setShort(index + 1, (short) field);
+ break;
+ case java.sql.Types.INTEGER:
+ upload.setInt(index + 1, (int) field);
+ break;
+ case java.sql.Types.BIGINT:
+ upload.setLong(index + 1, (long) field);
+ break;
+ case java.sql.Types.REAL:
+ upload.setFloat(index + 1, (float) field);
+ break;
+ case java.sql.Types.FLOAT:
+ case java.sql.Types.DOUBLE:
+ upload.setDouble(index + 1, (double) field);
+ break;
+ case java.sql.Types.DECIMAL:
+ case java.sql.Types.NUMERIC:
+ upload.setBigDecimal(index + 1, (java.math.BigDecimal) field);
+ break;
+ case java.sql.Types.DATE:
+ upload.setDate(index + 1, (java.sql.Date) field);
+ break;
+ case java.sql.Types.TIME:
+ upload.setTime(index + 1, (java.sql.Time) field);
+ break;
+ case java.sql.Types.TIMESTAMP:
+ upload.setTimestamp(index + 1, (java.sql.Timestamp) field);
+ break;
+ case java.sql.Types.BINARY:
+ case java.sql.Types.VARBINARY:
+ case java.sql.Types.LONGVARBINARY:
+ upload.setBytes(index + 1, (byte[]) field);
+ break;
+ default:
+ upload.setObject(index + 1, field);
+ LOG.warn(
+ "Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
+ type,
+ index + 1,
+ field);
+ // case java.sql.Types.SQLXML
+ // case java.sql.Types.ARRAY:
+ // case java.sql.Types.JAVA_OBJECT:
+ // case java.sql.Types.BLOB:
+ // case java.sql.Types.CLOB:
+ // case java.sql.Types.NCLOB:
+ // case java.sql.Types.DATALINK:
+ // case java.sql.Types.DISTINCT:
+ // case java.sql.Types.OTHER:
+ // case java.sql.Types.REF:
+ // case java.sql.Types.ROWID:
+ // case java.sql.Types.STRUC
+ }
+ } catch (ClassCastException e) {
+ // enrich the exception with detailed information.
+ String errorMessage =
+ String.format(
+ "%s, field index: %s, field value: %s.",
+ e.getMessage(), index, field);
+ ClassCastException enrichedException = new ClassCastException(errorMessage);
+ enrichedException.setStackTrace(e.getStackTrace());
+ throw enrichedException;
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java
new file mode 100644
index 00000000..8119da4d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+
+/**
+ * Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed.
+ * In contrast to {@code Runnable}, this interface allows throwing checked exceptions.
+ */
+
+@FunctionalInterface
+public interface ThrowingRunnable<E extends Throwable> {
+
+ /**
+ * The work method.
+ *
+ * @throws E Exceptions may be thrown.
+ */
+ void run() throws E;
+
+ /**
+ * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked
+ * exceptions as unchecked.
+ *
+ * @param throwingRunnable to convert into a {@link Runnable}
+ * @return {@link Runnable} which throws all checked exceptions as unchecked.
+ */
+ static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+ return () -> {
+ try {
+ throwingRunnable.run();
+ } catch (Throwable t) {
+ ExceptionUtils.rethrow(t);
+ }
+ };
+ }
+}