You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/13 07:27:30 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft][connector] Add SeaTunnel jdbc sink (#1946) (#2009)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 890211c2 [api-draft][connector]  Add SeaTunnel jdbc sink (#1946) (#2009)
890211c2 is described below

commit 890211c238f8d64749e9cded51f770efad3774b4
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon Jun 13 15:27:26 2022 +0800

    [api-draft][connector]  Add SeaTunnel jdbc sink (#1946) (#2009)
    
    * Add SeaTunnel jdbc sink (#1946)
    
    * fix code style err
    
    * fix use deprecated method
    
    Co-authored-by: liuli <li...@analysys.com.cn>
    Co-authored-by: Hisoka <10...@qq.com>
---
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   5 +
 .../seatunnel-connectors-seatunnel/pom.xml         |   1 +
 .../pom.xml                                        |  32 +-
 .../seatunnel/jdbc/internal/JdbcOutputFormat.java  | 257 +++++++++++
 .../jdbc/internal/connection/DataSourceUtils.java  | 104 +++++
 .../connection/JdbcConnectionProvider.java         |  70 +++
 .../connection/SimpleJdbcConnectionProvider.java   | 154 +++++++
 .../internal/executor/BiConsumerWithException.java |  61 +++
 .../executor/JdbcBatchStatementExecutor.java       |  36 ++
 .../internal/executor/JdbcStatementBuilder.java    |  29 ++
 .../executor/SimpleBatchStatementExecutor.java     |  79 ++++
 .../jdbc/internal/options/JdbcConfig.java          |  55 +++
 .../internal/options/JdbcConnectorOptions.java     | 253 +++++++++++
 .../jdbc/internal/xa/GroupXaOperationResult.java   |  80 ++++
 .../jdbc/internal/xa/SemanticXidGenerator.java     | 113 +++++
 .../seatunnel/jdbc/internal/xa/XaFacade.java       | 113 +++++
 .../jdbc/internal/xa/XaFacadeImplAutoLoad.java     | 478 +++++++++++++++++++++
 .../seatunnel/jdbc/internal/xa/XaGroupOps.java     |  43 ++
 .../seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java | 151 +++++++
 .../seatunnel/jdbc/internal/xa/XidGenerator.java   |  62 +++
 .../seatunnel/jdbc/internal/xa/XidImpl.java        | 136 ++++++
 .../jdbc/sink/JdbcExactlyOnceSinkWriter.java       | 189 ++++++++
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   | 142 ++++++
 .../jdbc/sink/JdbcSinkAggregatedCommitter.java     |  91 ++++
 .../seatunnel/jdbc/sink/JdbcSinkCommitter.java     |  70 +++
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  96 +++++
 .../jdbc/state/JdbcAggregatedCommitInfo.java       |  30 ++
 .../seatunnel/jdbc/state/JdbcSinkState.java        |  31 ++
 .../connectors/seatunnel/jdbc/state/XidInfo.java   |  37 ++
 .../seatunnel/jdbc/utils/ExceptionUtils.java       |  53 +++
 .../connectors/seatunnel/jdbc/utils/JdbcUtils.java | 158 +++++++
 .../seatunnel/jdbc/utils/ThrowingRunnable.java     |  52 +++
 32 files changed, 3251 insertions(+), 10 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index 42a03364..d0013169 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -50,6 +50,11 @@
             <artifactId>seatunnel-connector-seatunnel-hive</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-connector-seatunnel-socket</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index 78e60052..86d0b527 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -35,6 +35,7 @@
         <module>seatunnel-connector-seatunnel-console</module>
         <module>seatunnel-connector-seatunnel-fake</module>
         <module>seatunnel-connector-seatunnel-kafka</module>
+        <module>seatunnel-connector-seatunnel-jdbc</module>
         <module>seatunnel-connector-seatunnel-socket</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
similarity index 65%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
index 78e60052..d5c9ad27 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/pom.xml
@@ -21,20 +21,32 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-connectors</artifactId>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
 
-    <artifactId>seatunnel-connectors-seatunnel</artifactId>
+    <artifactId>seatunnel-connector-seatunnel-jdbc</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
+
+    </dependencies>
 
-    <modules>
-        <module>seatunnel-connector-seatunnel-hive</module>
-        <module>seatunnel-connector-seatunnel-console</module>
-        <module>seatunnel-connector-seatunnel-fake</module>
-        <module>seatunnel-connector-seatunnel-kafka</module>
-        <module>seatunnel-connector-seatunnel-socket</module>
-    </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
new file mode 100644
index 00000000..f7738140
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormat.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * A JDBC outputFormat
+ */
+public class JdbcOutputFormat<I, E extends JdbcBatchStatementExecutor<I>>
+    implements Serializable {
+
+    protected final JdbcConnectionProvider connectionProvider;
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcOutputFormat.class);
+
+    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private final StatementExecutorFactory<E> statementExecutorFactory;
+
+    private transient E jdbcStatementExecutor;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public JdbcOutputFormat(
+        JdbcConnectionProvider connectionProvider,
+        JdbcConnectorOptions jdbcConnectorOptions,
+        StatementExecutorFactory<E> statementExecutorFactory) {
+        this.connectionProvider = checkNotNull(connectionProvider);
+        this.jdbcConnectorOptions = checkNotNull(jdbcConnectorOptions);
+        this.statementExecutorFactory = checkNotNull(statementExecutorFactory);
+    }
+
+    /**
+     * Connects to the target database and initializes the prepared statement.
+     */
+
+    public void open()
+        throws IOException {
+        try {
+            connectionProvider.getOrEstablishConnection();
+        }
+        catch (Exception e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
+
+        if (jdbcConnectorOptions.getBatchIntervalMs() != 0 && jdbcConnectorOptions.getBatchSize() != 1) {
+            this.scheduler =
+                Executors.newScheduledThreadPool(
+                    1, runnable -> {
+                        AtomicInteger cnt = new AtomicInteger(0);
+                        Thread thread = new Thread(runnable);
+                        thread.setDaemon(true);
+                        thread.setName("jdbc-upsert-output-format" + "-" + cnt.incrementAndGet());
+                        return thread;
+                    });
+            this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                    () -> {
+                        synchronized (JdbcOutputFormat.this) {
+                            if (!closed) {
+                                try {
+                                    flush();
+                                }
+                                catch (Exception e) {
+                                    flushException = e;
+                                }
+                            }
+                        }
+                    },
+                    jdbcConnectorOptions.getBatchIntervalMs(),
+                    jdbcConnectorOptions.getBatchIntervalMs(),
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private E createAndOpenStatementExecutor(
+        StatementExecutorFactory<E> statementExecutorFactory)
+        throws IOException {
+        E exec = statementExecutorFactory.get();
+        try {
+            exec.prepareStatements(connectionProvider.getConnection());
+        }
+        catch (SQLException e) {
+            throw new IOException("unable to open JDBC writer", e);
+        }
+        return exec;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to JDBC failed.", flushException);
+        }
+    }
+
+    public final synchronized void writeRecord(I record)
+        throws IOException {
+        checkFlushException();
+        try {
+            addToBatch(record);
+            batchCount++;
+            if (jdbcConnectorOptions.getBatchSize() > 0
+                && batchCount >= jdbcConnectorOptions.getBatchSize()) {
+                flush();
+            }
+        }
+        catch (Exception e) {
+            throw new IOException("Writing records to JDBC failed.", e);
+        }
+    }
+
+    protected void addToBatch(I record)
+        throws SQLException {
+        jdbcStatementExecutor.addToBatch(record);
+    }
+
+    public synchronized void flush()
+        throws IOException {
+        checkFlushException();
+        final int sleepMs = 1000;
+        for (int i = 0; i <= jdbcConnectorOptions.getMaxRetries(); i++) {
+            try {
+                attemptFlush();
+                batchCount = 0;
+                break;
+            }
+            catch (SQLException e) {
+                LOG.error("JDBC executeBatch error, retry times = {}", i, e);
+                if (i >= jdbcConnectorOptions.getMaxRetries()) {
+                    ExceptionUtils.rethrowIOException(e);
+                }
+                try {
+                    if (!connectionProvider.isConnectionValid()) {
+                        updateExecutor(true);
+                    }
+                }
+                catch (Exception exception) {
+                    LOG.error(
+                        "JDBC connection is not valid, and reestablish connection failed.",
+                        exception);
+                    throw new IOException("Reestablish JDBC connection failed", exception);
+                }
+                try {
+                    Thread.sleep(sleepMs * i);
+                }
+                catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(
+                        "unable to flush; interrupted while doing another attempt", e);
+                }
+            }
+        }
+    }
+
+    protected void attemptFlush()
+        throws SQLException {
+        jdbcStatementExecutor.executeBatch();
+    }
+
+    /**
+     * Executes prepared statement and closes all resources of this instance.
+     */
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                }
+                catch (Exception e) {
+                    LOG.warn("Writing records to JDBC failed.", e);
+                    throw new RuntimeException("Writing records to JDBC failed.", e);
+                }
+            }
+
+            try {
+                if (jdbcStatementExecutor != null) {
+                    jdbcStatementExecutor.closeStatements();
+                }
+            }
+            catch (SQLException e) {
+                LOG.warn("Close JDBC writer failed.", e);
+            }
+        }
+        connectionProvider.closeConnection();
+        checkFlushException();
+    }
+
+    public void updateExecutor(boolean reconnect)
+        throws SQLException, ClassNotFoundException {
+        jdbcStatementExecutor.closeStatements();
+        jdbcStatementExecutor.prepareStatements(
+            reconnect ? connectionProvider.reestablishConnection() : connectionProvider.getConnection());
+    }
+
+    @VisibleForTesting
+    public Connection getConnection() {
+        return connectionProvider.getConnection();
+    }
+
+    /**
+     * A factory for creating {@link JdbcBatchStatementExecutor} instance.
+     *
+     * @param <T> The type of instance.
+     */
+    public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
+        extends Supplier<T>, Serializable {}
+
+    ;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
new file mode 100644
index 00000000..1e2e213a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/DataSourceUtils.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import com.google.common.base.CaseFormat;
+import lombok.NonNull;
+
+import javax.sql.CommonDataSource;
+import javax.sql.DataSource;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class DataSourceUtils
+    implements Serializable {
+    private static final String GETTER_PREFIX = "get";
+
+    private static final String SETTER_PREFIX = "set";
+
+    public static CommonDataSource buildCommonDataSource(@NonNull JdbcConnectorOptions jdbcConnectorOptions)
+        throws InvocationTargetException, IllegalAccessException {
+        CommonDataSource dataSource = (CommonDataSource) loadDataSource(jdbcConnectorOptions.getXaDataSourceClassName());
+        setProperties(dataSource, buildDatabaseAccessConfig(jdbcConnectorOptions));
+        return dataSource;
+    }
+
+    private static Map<String, Object> buildDatabaseAccessConfig(JdbcConnectorOptions jdbcConnectorOptions) {
+        HashMap<String, Object> accessConfig = new HashMap<>();
+        accessConfig.put("url", jdbcConnectorOptions.getUrl());
+        if (jdbcConnectorOptions.getUsername().isPresent()) {
+            accessConfig.put("user", jdbcConnectorOptions.getUsername().get());
+        }
+        if (jdbcConnectorOptions.getPassword().isPresent()) {
+            accessConfig.put("password", jdbcConnectorOptions.getPassword().get());
+        }
+
+        return accessConfig;
+    }
+
+    private static void setProperties(final CommonDataSource commonDataSource, final Map<String, Object> databaseAccessConfig)
+        throws InvocationTargetException, IllegalAccessException {
+        for (Map.Entry<String, Object> entry : databaseAccessConfig.entrySet()) {
+            Optional<Method> method = findSetterMethod(commonDataSource.getClass().getMethods(), entry.getKey());
+            if (method.isPresent()) {
+                method.get().invoke(commonDataSource, entry.getValue());
+            }
+        }
+    }
+
+    private static Method findGetterMethod(final DataSource dataSource, final String propertyName)
+        throws NoSuchMethodException {
+        String getterMethodName = GETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, propertyName);
+        Method result = dataSource.getClass().getMethod(getterMethodName);
+        result.setAccessible(true);
+        return result;
+    }
+
+    private static Optional<Method> findSetterMethod(final Method[] methods, final String property) {
+        String setterMethodName = SETTER_PREFIX + CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, property);
+        return Arrays.stream(methods)
+            .filter(each -> each.getName().equals(setterMethodName) && 1 == each.getParameterTypes().length)
+            .findFirst();
+    }
+
+    private static Object loadDataSource(final String xaDataSourceClassName) {
+        Class<?> xaDataSourceClass;
+        try {
+            xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
+        } catch (final ClassNotFoundException ignored) {
+            try {
+                xaDataSourceClass = Class.forName(xaDataSourceClassName);
+            } catch (final ClassNotFoundException ex) {
+                throw new RuntimeException("Failed to load [" + xaDataSourceClassName + "]", ex);
+            }
+        }
+        try {
+            return xaDataSourceClass.getDeclaredConstructor().newInstance();
+        } catch (final ReflectiveOperationException ex) {
+            throw new RuntimeException("Failed to instance [" + xaDataSourceClassName + "]", ex);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java
new file mode 100644
index 00000000..94c10dc2
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/JdbcConnectionProvider.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * JDBC connection provider.
+ */
+
+public interface JdbcConnectionProvider {
+    /**
+     * Get existing connection.
+     *
+     * @return existing connection
+     */
+
+    Connection getConnection();
+
+    /**
+     * Check whether possible existing connection is valid or not through {@link
+     * Connection#isValid(int)}.
+     *
+     * @return true if existing connection is valid
+     * @throws SQLException sql exception throw from {@link Connection#isValid(int)}
+     */
+    boolean isConnectionValid()
+            throws SQLException;
+
+    /**
+     * Get existing connection or establish an new one if there is none.
+     *
+     * @return existing connection or newly established connection
+     * @throws SQLException sql exception
+     * @throws ClassNotFoundException driver class not found
+     */
+    Connection getOrEstablishConnection()
+            throws SQLException, ClassNotFoundException;
+
+    /**
+     * Close possible existing connection.
+     */
+    void closeConnection();
+
+    /**
+     * Close possible existing connection and establish an new one.
+     *
+     * @return newly established connection
+     * @throws SQLException sql exception
+     * @throws ClassNotFoundException driver class not found
+     */
+    Connection reestablishConnection()
+            throws SQLException, ClassNotFoundException;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
new file mode 100644
index 00000000..7e69e990
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+/**
+ * Simple JDBC connection provider.
+ */
+public class SimpleJdbcConnectionProvider
+    implements JdbcConnectionProvider, Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private final JdbcConnectorOptions jdbcOptions;
+
+    private transient Driver loadedDriver;
+    private transient Connection connection;
+
+    static {
+        // Load DriverManager first to avoid deadlock between DriverManager's
+        // static initialization block and specific driver class's static
+        // initialization block when two different driver classes are loading
+        // concurrently using Class.forName while DriverManager is uninitialized
+        // before.
+        //
+        // This could happen in JDK 8 but not above as driver loading has been
+        // moved out of DriverManager's static initialization block since JDK 9.
+        DriverManager.getDrivers();
+    }
+
+    public SimpleJdbcConnectionProvider(@NonNull JdbcConnectorOptions jdbcOptions) {
+        this.jdbcOptions = jdbcOptions;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public boolean isConnectionValid()
+        throws SQLException {
+        return connection != null
+            && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
+    }
+
+    private static Driver loadDriver(String driverName)
+        throws SQLException, ClassNotFoundException {
+        checkNotNull(driverName);
+        Enumeration<Driver> drivers = DriverManager.getDrivers();
+        while (drivers.hasMoreElements()) {
+            Driver driver = drivers.nextElement();
+            if (driver.getClass().getName().equals(driverName)) {
+                return driver;
+            }
+        }
+
+        // We could reach here for reasons:
+        // * Class loader hell of DriverManager(see JDK-8146872).
+        // * driver is not installed as a service provider.
+        Class<?> clazz =
+            Class.forName(driverName, true, Thread.currentThread().getContextClassLoader());
+        try {
+            return (Driver) clazz.getDeclaredConstructor().newInstance();
+        } catch (Exception ex) {
+            throw new SQLException("Fail to create driver of class " + driverName, ex);
+        }
+    }
+
+    private Driver getLoadedDriver()
+        throws SQLException, ClassNotFoundException {
+        if (loadedDriver == null) {
+            loadedDriver = loadDriver(jdbcOptions.getDriverName());
+        }
+        return loadedDriver;
+    }
+
+    @Override
+    public Connection getOrEstablishConnection()
+        throws SQLException, ClassNotFoundException {
+        if (connection != null) {
+            return connection;
+        }
+        Driver driver = getLoadedDriver();
+        Properties info = new Properties();
+        if (jdbcOptions.getUsername().isPresent()) {
+            info.setProperty("user", jdbcOptions.getUsername().get());
+        }
+        if (jdbcOptions.getPassword().isPresent()) {
+            info.setProperty("password", jdbcOptions.getPassword().get());
+        }
+        connection = driver.connect(jdbcOptions.getUrl(), info);
+        if (connection == null) {
+            // Throw same exception as DriverManager.getConnection when no driver found to match
+            // caller expectation.
+            throw new SQLException(
+                "No suitable driver found for " + jdbcOptions.getUrl(), "08001");
+        }
+
+        return connection;
+    }
+
+    @Override
+    public void closeConnection() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.warn("JDBC connection close failed.", e);
+            } finally {
+                connection = null;
+            }
+        }
+    }
+
+    @Override
+    public Connection reestablishConnection()
+        throws SQLException, ClassNotFoundException {
+        closeConnection();
+        return getOrEstablishConnection();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java
new file mode 100644
index 00000000..24d9d17d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.util.function.BiConsumer;
+
+/**
+ * A checked extension of the {@link BiConsumer} interface.
+ *
+ * @param <T> type of the first argument
+ * @param <U> type of the second argument
+ * @param <E> type of the thrown exception
+ */
+public interface BiConsumerWithException<T, U, E extends Throwable> {
+
+    /**
+     * Performs this operation on the given arguments.
+     *
+     * @param t the first input argument
+     * @param u the second input argument
+     * @throws E in case of an error
+     */
+    void accept(T t, U u) throws E;
+
+    /**
+     * Convert a {@link BiConsumerWithException} into a {@link BiConsumer}.
+     *
+     * @param biConsumerWithException BiConsumer with exception to convert into a {@link
+     *     BiConsumer}.
+     * @param <A> first input type
+     * @param <B> second input type
+     * @return {@link BiConsumer} which rethrows all checked exceptions as unchecked.
+     */
+    static <A, B> BiConsumer<A, B> unchecked(
+            BiConsumerWithException<A, B, ?> biConsumerWithException) {
+        return (A a, B b) -> {
+            try {
+                biConsumerWithException.accept(a, b);
+            } catch (Throwable t) {
+                ExceptionUtils.rethrow(t);
+            }
+        };
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java
new file mode 100644
index 00000000..ec0bde8d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcBatchStatementExecutor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/** Executes the given JDBC statement in batch for the accumulated records. */
+public interface JdbcBatchStatementExecutor<T> {
+
+    /** Create statements from connection. */
+    void prepareStatements(Connection connection) throws SQLException;
+
+    void addToBatch(T record) throws SQLException;
+
+    /** Submits a batch of commands to the database for execution. */
+    void executeBatch() throws SQLException;
+
+    /** Close JDBC related statements. */
+    void closeStatements() throws SQLException;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java
new file mode 100644
index 00000000..f8b778c4
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/**
+ * Sets {@link PreparedStatement} parameters to use in JDBC Sink based on a specific type of
+ * StreamRecord.
+ */
+public interface JdbcStatementBuilder<T>
+        extends BiConsumerWithException<PreparedStatement, T, SQLException>, Serializable {}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
new file mode 100644
index 00000000..bb4340b5
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/SimpleBatchStatementExecutor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that executes supplied statement for given the records
+ * (without any pre-processing).
+ */
+public class SimpleBatchStatementExecutor<T> implements JdbcBatchStatementExecutor<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleBatchStatementExecutor.class);
+
+    private final String sql;
+    private final JdbcStatementBuilder<T> parameterSetter;
+    private final List<T> batch;
+
+    private transient PreparedStatement st;
+
+    public SimpleBatchStatementExecutor(
+        String sql, JdbcStatementBuilder<T> statementBuilder) {
+        this.sql = sql;
+        this.parameterSetter = statementBuilder;
+        this.batch = new ArrayList<T>();
+    }
+
+    @Override
+    public void prepareStatements(Connection connection) throws SQLException {
+        this.st = connection.prepareStatement(sql);
+    }
+
+    @Override
+    public void addToBatch(T record) {
+        batch.add(record);
+    }
+
+    @Override
+    public void executeBatch() throws SQLException {
+        if (!batch.isEmpty()) {
+            for (T r : batch) {
+                parameterSetter.accept(st, r);
+                st.addBatch();
+            }
+            st.executeBatch();
+            batch.clear();
+        }
+    }
+
+    @Override
+    public void closeStatements() throws SQLException {
+        if (st != null) {
+            st.close();
+            st = null;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java
new file mode 100644
index 00000000..01348508
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import java.io.Serializable;
+
+public class JdbcConfig implements Serializable {
+
+    public static final String URL = "url";
+
+    public static final String DRIVER = "driver";
+
+    public static final String CONNECTION_CHECK_TIMEOUT_SEC = "connection_check_timeout_sec";
+
+    public static final String MAX_RETRIES = "max_retries";
+
+    public static final String USER = "user";
+
+    public static final String PASSWORD = "password";
+
+    public static final String QUERY = "query";
+
+    public static final String PARALLELISM = "parallelism";
+
+
+    public static final String BATCH_SIZE = "batch_size";
+
+    public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+
+
+    public static final String IS_EXACTLY_ONCE = "is_exactly_once";
+
+    public static final String XA_DATA_SOURCE_CLASS_NAME = "xa_data_source_class_name";
+
+
+    public static final String MAX_COMMIT_ATTEMPTS = "max_commit_attempts";
+
+    public static final String TRANSACTION_TIMEOUT_SEC = "transaction_timeout_sec";
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java
new file mode 100644
index 00000000..7b1207f1
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/options/JdbcConnectorOptions.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+public class JdbcConnectorOptions
+    implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private static final int DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC = 30;
+    private static final int DEFAULT_MAX_RETRIES = 3;
+    private static final int DEFAULT_BATCH_SIZE = 300;
+    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
+    private static final boolean DEFAULT_IS_EXACTLY_ONCE = false;
+    private static final int DEFAULT_MAX_COMMIT_ATTEMPTS = 3;
+    private static final int DEFAULT_TRANSACTION_TIMEOUT_SEC = -1;
+
+    private String url;
+    private String driverName;
+    private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+    private int maxRetries = DEFAULT_MAX_RETRIES;
+    private String username;
+    private String password;
+    private String query;
+
+    private int batchSize = DEFAULT_BATCH_SIZE;
+    private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+
+    private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE;
+    private String xaDataSourceClassName;
+
+    private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+
+    private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+    private JdbcConnectorOptions() {
+    }
+
+    public JdbcConnectorOptions(@NonNull Config config) {
+        this.url = config.getString(JdbcConfig.URL);
+        this.driverName = config.getString(JdbcConfig.DRIVER);
+        if (config.hasPath(JdbcConfig.USER)) {
+            this.username = config.getString(JdbcConfig.USER);
+        }
+        if (config.hasPath(JdbcConfig.PASSWORD)) {
+            this.password = config.getString(JdbcConfig.PASSWORD);
+        }
+        this.query = config.getString(JdbcConfig.QUERY);
+
+        if (config.hasPath(JdbcConfig.MAX_RETRIES)) {
+            this.maxRetries = config.getInt(JdbcConfig.MAX_RETRIES);
+        }
+        if (config.hasPath(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC)) {
+            this.connectionCheckTimeoutSeconds = config.getInt(JdbcConfig.CONNECTION_CHECK_TIMEOUT_SEC);
+        }
+        if (config.hasPath(JdbcConfig.BATCH_SIZE)) {
+            this.batchSize = config.getInt(JdbcConfig.BATCH_SIZE);
+        }
+        if (config.hasPath(JdbcConfig.BATCH_INTERVAL_MS)) {
+            this.batchIntervalMs = config.getInt(JdbcConfig.BATCH_INTERVAL_MS);
+        }
+
+        if (config.hasPath(JdbcConfig.IS_EXACTLY_ONCE)) {
+            this.isExactlyOnce = true;
+            this.xaDataSourceClassName = config.getString(JdbcConfig.XA_DATA_SOURCE_CLASS_NAME);
+            if (config.hasPath(JdbcConfig.MAX_COMMIT_ATTEMPTS)) {
+                this.maxCommitAttempts = config.getInt(JdbcConfig.MAX_COMMIT_ATTEMPTS);
+            }
+            if (config.hasPath(JdbcConfig.TRANSACTION_TIMEOUT_SEC)) {
+                this.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
+            }
+        }
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public String getDriverName() {
+        return driverName;
+    }
+
+    public int getConnectionCheckTimeoutSeconds() {
+        return connectionCheckTimeoutSeconds;
+    }
+
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+
+    public Optional<String> getUsername() {
+        return Optional.ofNullable(username);
+    }
+
+    public Optional<String> getPassword() {
+        return Optional.ofNullable(password);
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public int getBatchIntervalMs() {
+        return batchIntervalMs;
+    }
+
+    public boolean isExactlyOnce() {
+        return isExactlyOnce;
+    }
+
+    public String getXaDataSourceClassName() {
+        return xaDataSourceClassName;
+    }
+
+    public int getMaxCommitAttempts() {
+        return maxCommitAttempts;
+    }
+
+    public Optional<Integer> getTransactionTimeoutSec() {
+        return transactionTimeoutSec < 0 ? Optional.empty() : Optional.of(transactionTimeoutSec);
+    }
+
+    public static JdbcConnectorOptionsBuilder builder() {
+        return new JdbcConnectorOptionsBuilder();
+    }
+
+    public static final class JdbcConnectorOptionsBuilder {
+        private String url;
+        private String driverName;
+        private int connectionCheckTimeoutSeconds = DEFAULT_CONNECTION_CHECK_TIMEOUT_SEC;
+        private int maxRetries = DEFAULT_MAX_RETRIES;
+        private String username;
+        private String password;
+        private String query;
+        private int batchSize = DEFAULT_BATCH_SIZE;
+        private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
+        private boolean isExactlyOnce = DEFAULT_IS_EXACTLY_ONCE;
+        private String xaDataSourceClassName;
+        private int maxCommitAttempts = DEFAULT_MAX_COMMIT_ATTEMPTS;
+        private int transactionTimeoutSec = DEFAULT_TRANSACTION_TIMEOUT_SEC;
+
+        private JdbcConnectorOptionsBuilder() {
+        }
+
+        public JdbcConnectorOptionsBuilder withUrl(String url) {
+            this.url = url;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withDriverName(String driverName) {
+            this.driverName = driverName;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
+            this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withMaxRetries(int maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withUsername(String username) {
+            this.username = username;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withPassword(String password) {
+            this.password = password;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withQuery(String query) {
+            this.query = query;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withBatchIntervalMs(int batchIntervalMs) {
+            this.batchIntervalMs = batchIntervalMs;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withIsExactlyOnce(boolean isExactlyOnce) {
+            this.isExactlyOnce = isExactlyOnce;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withXaDataSourceClassName(String xaDataSourceClassName) {
+            this.xaDataSourceClassName = xaDataSourceClassName;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withMaxCommitAttempts(int maxCommitAttempts) {
+            this.maxCommitAttempts = maxCommitAttempts;
+            return this;
+        }
+
+        public JdbcConnectorOptionsBuilder withTransactionTimeoutSec(int transactionTimeoutSec) {
+            this.transactionTimeoutSec = transactionTimeoutSec;
+            return this;
+        }
+
+        public JdbcConnectorOptions build() {
+            JdbcConnectorOptions jdbcConnectorOptions = new JdbcConnectorOptions();
+            jdbcConnectorOptions.batchSize = this.batchSize;
+            jdbcConnectorOptions.batchIntervalMs = this.batchIntervalMs;
+            jdbcConnectorOptions.driverName = this.driverName;
+            jdbcConnectorOptions.maxRetries = this.maxRetries;
+            jdbcConnectorOptions.password = this.password;
+            jdbcConnectorOptions.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds;
+            jdbcConnectorOptions.query = this.query;
+            jdbcConnectorOptions.url = this.url;
+            jdbcConnectorOptions.username = this.username;
+            jdbcConnectorOptions.transactionTimeoutSec = this.transactionTimeoutSec;
+            jdbcConnectorOptions.maxCommitAttempts = this.maxCommitAttempts;
+            jdbcConnectorOptions.isExactlyOnce = this.isExactlyOnce;
+            jdbcConnectorOptions.xaDataSourceClassName = this.xaDataSourceClassName;
+            return jdbcConnectorOptions;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java
new file mode 100644
index 00000000..34952249
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/GroupXaOperationResult.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class GroupXaOperationResult<T> {
+    private final List<T> succeeded = new ArrayList<>();
+    private final List<T> failed = new ArrayList<>();
+    private final List<T> toRetry = new ArrayList<>();
+    private Optional<Exception> failure = Optional.empty();
+    private Optional<Exception> transientFailure = Optional.empty();
+
+    void failedTransiently(T x, XaFacade.TransientXaException e) {
+        toRetry.add(x);
+        transientFailure =
+            getTransientFailure().isPresent() ? getTransientFailure() : Optional.of(e);
+    }
+
+    void failed(T x, Exception e) {
+        failed.add(x);
+        failure = failure.isPresent() ? failure : Optional.of(e);
+    }
+
+    void succeeded(T x) {
+        succeeded.add(x);
+    }
+
+    private RuntimeException wrapFailure(
+        Exception error, String formatWithCounts, int errCount) {
+        return new RuntimeException(
+            String.format(formatWithCounts, errCount, total()), error);
+    }
+
+    private int total() {
+        return succeeded.size() + failed.size() + toRetry.size();
+    }
+
+    public List<T> getForRetry() {
+        return toRetry;
+    }
+
+    Optional<Exception> getTransientFailure() {
+        return transientFailure;
+    }
+
+    boolean hasNoFailures() {
+        return !failure.isPresent() && !transientFailure.isPresent();
+    }
+
+    void throwIfAnyFailed(String action) {
+        failure.map(
+            f ->
+                    wrapFailure(
+                        f,
+                        "failed to " + action + " %d transactions out of %d",
+                        toRetry.size() + failed.size()))
+            .ifPresent(
+                f -> {
+                    throw f;
+                });
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
new file mode 100644
index 00000000..98825a57
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/SemanticXidGenerator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import javax.transaction.xa.Xid;
+
+import java.security.SecureRandom;
+import java.util.Arrays;
+
+/**
+ * Generates {@link Xid} from:
+ *
+ * <ol>
+ *   <li>To provide uniqueness over other jobs and apps, and other instances
+ *   <li>of this job, gtrid consists of
+ *   <li>job id (16 bytes)
+ *   <li>subtask index (4 bytes)
+ *   <li>checkpoint id (4 bytes)
+ *   <li>bqual consists of 4 random bytes (generated using {@link SecureRandom})
+ * </ol>
+ *
+ * <p>Each {@link SemanticXidGenerator} instance MUST be used for only one Sink (otherwise Xids will
+ * collide).
+ */
+class SemanticXidGenerator
+    implements XidGenerator {
+    private static final long serialVersionUID = 1L;
+
+    private static final SecureRandom SECURE_RANDOM = new SecureRandom();
+
+    private static final int JOB_ID_BYTES = 32;
+    private static final int FORMAT_ID = 201;
+
+    private transient byte[] gtridBuffer;
+    private transient byte[] bqualBuffer;
+
+    @Override
+    public void open() {
+        // globalTransactionId = job id + task index + checkpoint id
+        gtridBuffer = new byte[JOB_ID_BYTES + Integer.BYTES + Long.BYTES];
+        // branchQualifier = random bytes
+        bqualBuffer = getRandomBytes(Integer.BYTES);
+    }
+
+    @Override
+    public Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId) {
+        byte[] jobIdBytes = context.getJobId().getBytes();
+        checkArgument(jobIdBytes.length <= JOB_ID_BYTES);
+        System.arraycopy(jobIdBytes, 0, gtridBuffer, 0, JOB_ID_BYTES);
+
+        writeNumber(sinkContext.getIndexOfSubtask(), Integer.BYTES, gtridBuffer, JOB_ID_BYTES);
+        writeNumber(checkpointId, Long.BYTES, gtridBuffer, JOB_ID_BYTES + Integer.BYTES);
+        // relying on arrays copying inside XidImpl constructor
+        return new XidImpl(FORMAT_ID, gtridBuffer, bqualBuffer);
+    }
+
+    @Override
+    public boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext) {
+        if (xid.getFormatId() != FORMAT_ID) {
+            return false;
+        }
+        int subtaskIndex = readNumber(xid.getGlobalTransactionId(), JOB_ID_BYTES, Integer.BYTES);
+        if (subtaskIndex != sinkContext.getIndexOfSubtask()
+            && subtaskIndex <= sinkContext.getNumberOfParallelSubtasks() - 1) {
+            return false;
+        }
+        byte[] jobIdBytes = new byte[JOB_ID_BYTES];
+        System.arraycopy(xid.getGlobalTransactionId(), 0, jobIdBytes, 0, JOB_ID_BYTES);
+        return Arrays.equals(jobIdBytes, context.getJobId().getBytes());
+    }
+
+    private static int readNumber(byte[] bytes, int offset, int numBytes) {
+        final int number = 0xff;
+        int result = 0;
+        for (int i = 0; i < numBytes; i++) {
+            result |= (bytes[offset + i] & number) << Byte.SIZE * i;
+        }
+        return result;
+    }
+
+    private static void writeNumber(long number, int numBytes, byte[] dst, int dstOffset) {
+        for (int i = dstOffset; i < dstOffset + numBytes; i++) {
+            dst[i] = (byte) number;
+            number >>>= Byte.SIZE;
+        }
+    }
+
+    private byte[] getRandomBytes(int size) {
+        byte[] bytes = new byte[size];
+        SECURE_RANDOM.nextBytes(bytes);
+        return bytes;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
new file mode 100644
index 00000000..2f8b78be
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacade.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ *
+ * <p>Typical workflow:
+ *
+ * <ol>
+ *   <li>{@link #open}
+ *   <li>{@link #start} transaction
+ *   <li>{@link #getConnection}, write some data
+ *   <li>{@link #endAndPrepare} (or {@link #failAndRollback})
+ *   <li>{@link #commit} / {@link #rollback}
+ *   <li>{@link #close}
+ * </ol>
+ *
+ * {@link #recover} can be used to get abandoned prepared transactions for cleanup.
+ */
+
+public interface XaFacade
+        extends JdbcConnectionProvider, Serializable, AutoCloseable {
+
+    static XaFacade fromJdbcConnectionOptions(
+            JdbcConnectorOptions jdbcConnectorOptions) {
+        return  new XaFacadeImplAutoLoad(jdbcConnectorOptions);
+    }
+
+    void open() throws Exception;
+
+    boolean isOpen();
+
+    /** Start a new transaction. */
+    void start(Xid xid) throws Exception;
+
+    /** End and then prepare the transaction. Transaction can't be resumed afterwards. */
+    void endAndPrepare(Xid xid) throws Exception;
+
+    /**
+     * Commit previously prepared transaction.
+     *
+     * @param ignoreUnknown whether to ignore {@link XAException#XAER_NOTA
+     *     XAER_NOTA} error.
+     */
+    void commit(Xid xid, boolean ignoreUnknown) throws TransientXaException;
+
+    /** Rollback previously prepared transaction. */
+    void rollback(Xid xid) throws TransientXaException;
+
+    /**
+     * End transaction as {@link javax.transaction.xa.XAResource#TMFAIL failed}; in case of error,
+     * try to roll it back.
+     */
+    void failAndRollback(Xid xid) throws TransientXaException;
+
+    /**
+     * Note: this can block on some non-MVCC databases if there are ended not prepared transactions.
+     */
+    Collection<Xid> recover() throws TransientXaException;
+
+    /**
+     * Thrown by {@link XaFacade} when RM responds with {@link
+     * javax.transaction.xa.XAResource#XA_RDONLY XA_RDONLY} indicating that the transaction doesn't
+     * include any changes. When such a transaction is committed RM may return an error (usually,
+     * {@link XAException#XAER_NOTA XAER_NOTA}).
+     */
+    class EmptyXaTransactionException extends RuntimeException {
+        private final Xid xid;
+
+        EmptyXaTransactionException(Xid xid) {
+            super("end response XA_RDONLY, xid: " + xid);
+            this.xid = xid;
+        }
+
+        public Xid getXid() {
+            return xid;
+        }
+    }
+
+    /**
+     * Indicates a transient or unknown failure from the resource manager (see {@link
+     * XAException#XA_RBTRANSIENT XA_RBTRANSIENT}, {@link XAException#XAER_RMFAIL XAER_RMFAIL}).
+     */
+    class TransientXaException extends RuntimeException {
+        TransientXaException(XAException cause) {
+            super(cause);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
new file mode 100644
index 00000000..d8c969ce
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaFacadeImplAutoLoad.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static javax.transaction.xa.XAException.XAER_NOTA;
+import static javax.transaction.xa.XAException.XAER_RMFAIL;
+import static javax.transaction.xa.XAException.XA_HEURCOM;
+import static javax.transaction.xa.XAException.XA_HEURHAZ;
+import static javax.transaction.xa.XAException.XA_HEURMIX;
+import static javax.transaction.xa.XAException.XA_HEURRB;
+import static javax.transaction.xa.XAException.XA_RBBASE;
+import static javax.transaction.xa.XAException.XA_RBTIMEOUT;
+import static javax.transaction.xa.XAException.XA_RBTRANSIENT;
+import static javax.transaction.xa.XAResource.TMENDRSCAN;
+import static javax.transaction.xa.XAResource.TMNOFLAGS;
+import static javax.transaction.xa.XAResource.TMSTARTRSCAN;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ThrowingRunnable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Default {@link org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade} implementation.
+ */
+public class XaFacadeImplAutoLoad
+    implements XaFacade {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(XaFacadeImplAutoLoad.class);
+    private static final Set<Integer> TRANSIENT_ERR_CODES =
+        new HashSet<>(Arrays.asList(XA_RBTRANSIENT, XAER_RMFAIL));
+    private static final Set<Integer> HEUR_ERR_CODES =
+        new HashSet<>(Arrays.asList(XA_HEURRB, XA_HEURCOM, XA_HEURHAZ, XA_HEURMIX));
+    private static final int MAX_RECOVER_CALLS = 100;
+
+    private final JdbcConnectorOptions jdbcConnectorOptions;
+    private transient XAResource xaResource;
+    private transient Connection connection;
+    private transient XAConnection xaConnection;
+
+    XaFacadeImplAutoLoad(JdbcConnectorOptions jdbcConnectorOptions) {
+        checkState(jdbcConnectorOptions.isExactlyOnce(), "is_exactly_once config error");
+        this.jdbcConnectorOptions = jdbcConnectorOptions;
+    }
+
+    @Override
+    public void open() throws SQLException {
+        checkState(!isOpen(), "already connected");
+        XADataSource ds;
+        try {
+            ds = (XADataSource) DataSourceUtils.buildCommonDataSource(jdbcConnectorOptions);
+        }
+        catch (Exception e) {
+            throw new SQLException(e);
+        }
+        xaConnection = ds.getXAConnection();
+        xaResource = xaConnection.getXAResource();
+        if (jdbcConnectorOptions.getTransactionTimeoutSec().isPresent()) {
+            try {
+                xaResource.setTransactionTimeout(jdbcConnectorOptions.getTransactionTimeoutSec().get());
+            }
+            catch (XAException e) {
+                throw new SQLException(e);
+            }
+        }
+        connection = xaConnection.getConnection();
+        connection.setReadOnly(false);
+        connection.setAutoCommit(false);
+        checkState(!connection.getAutoCommit());
+    }
+
+    @Override
+    public void close() throws SQLException {
+        if (connection != null) {
+            connection.close(); // close connection - likely a wrapper
+            connection = null;
+        }
+        try {
+            xaConnection.close(); // close likely a pooled AND the underlying connection
+        }
+        catch (SQLException e) {
+            // Some databases (e.g. MySQL) rollback changes on normal client disconnect which
+            // causes an exception if an XA transaction was prepared. Note that resources are
+            // still released in case of an error. Pinning MySQL connections doesn't help as
+            // SuspendableXAConnection has the same close() logic.
+            // Other DBs don't rollback, e.g. for PgSql the previous connection.close() call
+            // disassociates the connection (and that call works because it has a check for XA)
+            // and rollback() is not called.
+            // In either case, not closing the XA connection here leads to the resource leak.
+            LOG.warn("unable to close XA connection", e);
+        }
+        xaResource = null;
+    }
+
+    @Override
+    public Connection getConnection() {
+        checkNotNull(connection);
+        return connection;
+    }
+
+    @Override
+    public boolean isConnectionValid() throws SQLException {
+        return isOpen() && connection.isValid(connection.getNetworkTimeout());
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException {
+        if (!isOpen()) {
+            open();
+        }
+        return connection;
+    }
+
+    @Override
+    public void closeConnection() {
+        try {
+            close();
+        }
+        catch (SQLException e) {
+            LOG.warn("Connection close failed.", e);
+        }
+    }
+
+    @Override
+    public Connection reestablishConnection() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void start(Xid xid) {
+        execute(Command.fromRunnable("start", xid, () -> xaResource.start(xid, TMNOFLAGS)));
+    }
+
+    @Override
+    public void endAndPrepare(Xid xid) {
+        execute(Command.fromRunnable("end", xid, () -> xaResource.end(xid, XAResource.TMSUCCESS)));
+        int prepResult = execute(new Command<>("prepare", of(xid), () -> xaResource.prepare(xid)));
+        if (prepResult == XAResource.XA_RDONLY) {
+            throw new EmptyXaTransactionException(xid);
+        }
+        else if (prepResult != XAResource.XA_OK) {
+            throw new RuntimeException(
+                formatErrorMessage("prepare", of(xid), empty(), "response: " + prepResult));
+        }
+    }
+
+    @Override
+    public void failAndRollback(Xid xid) {
+        execute(
+            Command.fromRunnable(
+                "end (fail)",
+                xid,
+                () -> {
+                    xaResource.end(xid, XAResource.TMFAIL);
+                    xaResource.rollback(xid);
+                },
+                err -> {
+                    if (err.errorCode >= XA_RBBASE) {
+                        rollback(xid);
+                    }
+                    else {
+                        LOG.warn(
+                            formatErrorMessage(
+                                "end (fail)", of(xid), of(err.errorCode)));
+                    }
+                }));
+    }
+
+    @Override
+    public void commit(Xid xid, boolean ignoreUnknown) {
+        execute(
+            Command.fromRunnableRecoverByWarn(
+                "commit",
+                xid,
+                () ->
+                    xaResource.commit(
+                        xid,
+                        false /* not onePhase because the transaction should be prepared already */),
+                e -> buildCommitErrorDesc(e, ignoreUnknown)));
+    }
+
+    @Override
+    public void rollback(Xid xid) {
+        execute(
+            Command.fromRunnableRecoverByWarn(
+                "rollback",
+                xid,
+                () -> xaResource.rollback(xid),
+                this::buildRollbackErrorDesc));
+    }
+
+    private void forget(Xid xid) {
+        execute(
+            Command.fromRunnableRecoverByWarn(
+                "forget",
+                xid,
+                () -> xaResource.forget(xid),
+                e -> of("manual cleanup may be required")));
+    }
+
+    @Override
+    public Collection<Xid> recover() {
+        return execute(
+            new Command<>(
+                "recover",
+                empty(),
+                () -> {
+                    List<Xid> list = recover(TMSTARTRSCAN);
+                    try {
+                        for (int i = 0; list.addAll(recover(TMNOFLAGS)); i++) {
+                            // H2 sometimes returns same tx list here - should probably use
+                            // recover(TMSTARTRSCAN | TMENDRSCAN)
+                            checkState(
+                                i < MAX_RECOVER_CALLS, "too many xa_recover() calls");
+                        }
+                    }
+                    finally {
+                        recover(TMENDRSCAN);
+                    }
+                    return list;
+                }));
+    }
+
+    @Override
+    public boolean isOpen() {
+        return xaResource != null;
+    }
+
+    private List<Xid> recover(int flags) throws XAException {
+        return Arrays.asList(xaResource.recover(flags));
+    }
+
+    private <T> T execute(Command<T> cmd) throws RuntimeException {
+        checkState(isOpen(), "not connected");
+        LOG.debug("{}, xid={}", cmd.name, cmd.xid);
+        try {
+            T result = cmd.callable.call();
+            LOG.trace("{} succeeded , xid={}", cmd.name, cmd.xid);
+            return result;
+        }
+        catch (XAException e) {
+            if (HEUR_ERR_CODES.contains(e.errorCode)) {
+                cmd.xid.ifPresent(this::forget);
+            }
+            return cmd.recover.apply(e).orElseThrow(() -> wrapException(cmd.name, cmd.xid, e));
+        }
+        catch (RuntimeException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw wrapException(cmd.name, cmd.xid, e);
+        }
+    }
+
+    private static RuntimeException wrapException(
+        String action, Optional<Xid> xid, Exception ex) {
+        if (ex instanceof XAException) {
+            XAException xa = (XAException) ex;
+            if (TRANSIENT_ERR_CODES.contains(xa.errorCode)) {
+                throw new TransientXaException(xa);
+            }
+            else {
+                throw new RuntimeException(
+                    formatErrorMessage(action, xid, of(xa.errorCode), xa.getMessage()));
+            }
+        }
+        else {
+            throw new RuntimeException(
+                formatErrorMessage(action, xid, empty(), ex.getMessage()), ex);
+        }
+    }
+
+    private Optional<String> buildCommitErrorDesc(XAException err, boolean ignoreUnknown) {
+        if (err.errorCode == XA_HEURCOM) {
+            return Optional.of("transaction was heuristically committed earlier");
+        }
+        else if (ignoreUnknown && err.errorCode == XAER_NOTA) {
+            return Optional.of("transaction is unknown to RM (ignoring)");
+        }
+        else {
+            return empty();
+        }
+    }
+
+    private Optional<String> buildRollbackErrorDesc(XAException err) {
+        if (err.errorCode == XA_HEURRB) {
+            return Optional.of("transaction was already heuristically rolled back");
+        }
+        else if (err.errorCode >= XA_RBBASE) {
+            return Optional.of("transaction was already marked for rollback");
+        }
+        else {
+            return empty();
+        }
+    }
+
+    private static String formatErrorMessage(
+        String action, Optional<Xid> xid, Optional<Integer> errorCode, String... more) {
+        return String.format(
+            "unable to %s%s%s%s",
+            action,
+            xid.map(x -> " XA transaction, xid: " + x).orElse(""),
+            errorCode
+                .map(code -> String.format(", error %d: %s", code, descError(code)))
+                .orElse(""),
+            more == null || more.length == 0 ? "" : ". " + Arrays.toString(more));
+    }
+
+    /**
+     * @return error description from {@link XAException} javadoc from to ease debug.
+     */
+    private static String descError(int code) {
+        switch (code) {
+            case XA_HEURCOM:
+                return "heuristic commit decision was made";
+            case XAException.XA_HEURHAZ:
+                return "heuristic decision may have been made";
+            case XAException.XA_HEURMIX:
+                return "heuristic mixed decision was made";
+            case XA_HEURRB:
+                return "heuristic rollback decision was made";
+            case XAException.XA_NOMIGRATE:
+                return "the transaction resumption must happen where the suspension occurred";
+            case XAException.XA_RBCOMMFAIL:
+                return "rollback happened due to a communications failure";
+            case XAException.XA_RBDEADLOCK:
+                return "rollback happened because deadlock was detected";
+            case XAException.XA_RBINTEGRITY:
+                return "rollback happened because an internal integrity check failed";
+            case XAException.XA_RBOTHER:
+                return "rollback happened for some reason not fitting any of the other rollback error codes";
+            case XAException.XA_RBPROTO:
+                return "rollback happened due to a protocol error in the resource manager";
+            case XAException.XA_RBROLLBACK:
+                return "rollback happened for an unspecified reason";
+            case XA_RBTIMEOUT:
+                return "rollback happened because of a timeout";
+            case XA_RBTRANSIENT:
+                return "rollback happened due to a transient failure";
+            case XAException.XA_RDONLY:
+                return "the transaction branch was read-only, and has already been committed";
+            case XAException.XA_RETRY:
+                return "the method invoked returned without having any effect, and that it may be invoked again";
+            case XAException.XAER_ASYNC:
+                return "an asynchronous operation is outstanding";
+            case XAException.XAER_DUPID:
+                return "Xid given as an argument is already known to the resource manager";
+            case XAException.XAER_INVAL:
+                return "invalid arguments were passed";
+            case XAER_NOTA:
+                return "Xid is not valid";
+            case XAException.XAER_OUTSIDE:
+                return "the resource manager is doing work outside the global transaction";
+            case XAException.XAER_PROTO:
+                return "protocol error";
+            case XAException.XAER_RMERR:
+                return "resource manager error has occurred";
+            case XAER_RMFAIL:
+                return "the resource manager has failed and is not available";
+            default:
+                return "";
+        }
+    }
+
+    private static class Command<T> {
+        private final String name;
+        private final Optional<Xid> xid;
+        private final Callable<T> callable;
+        private final Function<XAException, Optional<T>> recover;
+
+        static Command<Object> fromRunnable(
+            String action, Xid xid, ThrowingRunnable<XAException> runnable) {
+            return fromRunnable(
+                action,
+                xid,
+                runnable,
+                e -> {
+                    throw wrapException(action, of(xid), e);
+                });
+        }
+
+        static Command<Object> fromRunnableRecoverByWarn(
+            String action,
+            Xid xid,
+            ThrowingRunnable<XAException> runnable,
+            Function<XAException, Optional<String>> err2msg) {
+            return fromRunnable(
+                action,
+                xid,
+                runnable,
+                e ->
+                    LOG.warn(
+                        formatErrorMessage(
+                            action,
+                            of(xid),
+                            of(e.errorCode),
+                            err2msg.apply(e)
+                                .orElseThrow(
+                                    () ->
+                                        wrapException(
+                                            action, of(xid), e)))));
+        }
+
+        private static Command<Object> fromRunnable(
+            String action,
+            Xid xid,
+            ThrowingRunnable<XAException> runnable,
+            Consumer<XAException> recover) {
+            return new Command<>(
+                action,
+                of(xid),
+                () -> {
+                    runnable.run();
+                    return null;
+                },
+                e -> {
+                    recover.accept(e);
+                    return Optional.of("");
+                });
+        }
+
+        private Command(String name, Optional<Xid> xid, Callable<T> callable) {
+            this(name, xid, callable, e -> empty());
+        }
+
+        private Command(
+            String name,
+            Optional<Xid> xid,
+            Callable<T> callable,
+            Function<XAException, Optional<T>> recover) {
+            this.name = name;
+            this.xid = xid;
+            this.callable = callable;
+            this.recover = recover;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
new file mode 100644
index 00000000..e37e6b05
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOps.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+public interface XaGroupOps
+        extends Serializable {
+
+    // Commit a batch of transactions
+    public GroupXaOperationResult<XidInfo> commit(
+            List<XidInfo> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts);
+
+    void rollback(List<XidInfo> xids);
+
+    GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids);
+
+    void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid);
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
new file mode 100644
index 00000000..05ecce16
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class XaGroupOpsImpl
+    implements XaGroupOps {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(XaGroupOpsImpl.class);
+
+    private final XaFacade xaFacade;
+
+    public XaGroupOpsImpl(XaFacade xaFacade) {
+        this.xaFacade = xaFacade;
+    }
+
+    @Override
+    public GroupXaOperationResult<XidInfo> commit(
+        List<XidInfo> xids, boolean allowOutOfOrderCommits, int maxCommitAttempts) {
+        GroupXaOperationResult<XidInfo> result = new GroupXaOperationResult<>();
+        int origSize = xids.size();
+        LOG.debug("commit {} transactions", origSize);
+        for (Iterator<XidInfo> i = xids.iterator();
+             i.hasNext() && (result.hasNoFailures() || allowOutOfOrderCommits); ) {
+            XidInfo x = i.next();
+            i.remove();
+            try {
+                xaFacade.commit(x.getXid(), false);
+                result.succeeded(x);
+            } catch (XaFacade.TransientXaException e) {
+                result.failedTransiently(x.withAttemptsIncremented(), e);
+            } catch (Exception e) {
+                result.failed(x, e);
+            }
+        }
+        result.getForRetry().addAll(xids);
+        result.throwIfAnyFailed("commit");
+        throwIfAnyReachedMaxAttempts(result, maxCommitAttempts);
+        result.getTransientFailure()
+            .ifPresent(
+                f ->
+                    LOG.warn(
+                        "failed to commit {} transactions out of {} (keep them to retry later)",
+                        result.getForRetry().size(),
+                        origSize,
+                        f));
+        return result;
+    }
+
+    @Override
+    public void rollback(List<XidInfo> xids) {
+        for (XidInfo x : xids) {
+            xaFacade.rollback(x.getXid());
+        }
+    }
+
+    @Override
+    public GroupXaOperationResult<XidInfo> failAndRollback(Collection<XidInfo> xids) {
+        GroupXaOperationResult<XidInfo> result = new GroupXaOperationResult<>();
+        if (xids.isEmpty()) {
+            return result;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("rolling back {} transactions: {}", xids.size(), xids);
+        }
+        for (XidInfo x : xids) {
+            try {
+                xaFacade.failAndRollback(x.getXid());
+                result.succeeded(x);
+            } catch (XaFacade.TransientXaException e) {
+                LOG.info("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failedTransiently(x, e);
+            } catch (Exception e) {
+                LOG.warn("unable to fail/rollback transaction, xid={}: {}", x, e.getMessage());
+                result.failed(x, e);
+            }
+        }
+        if (!result.getForRetry().isEmpty()) {
+            LOG.info("failed to roll back {} transactions", result.getForRetry().size());
+        }
+        return result;
+    }
+
+    @Override
+    public void recoverAndRollback(SeaTunnelContext context, SinkWriter.Context sinkContext, XidGenerator xidGenerator, Xid excludeXid) {
+        Collection<Xid> recovered = xaFacade.recover();
+        recovered.remove(excludeXid);
+        if (recovered.isEmpty()) {
+            return;
+        }
+        LOG.warn("rollback {} recovered transactions", recovered.size());
+        for (Xid xid : recovered) {
+            if (xidGenerator.belongsToSubtask(xid, context, sinkContext)) {
+                try {
+                    xaFacade.rollback(xid);
+                } catch (Exception e) {
+                    LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
+                }
+            }
+        }
+    }
+
+    private static void throwIfAnyReachedMaxAttempts(
+        GroupXaOperationResult<XidInfo> result, int maxAttempts) {
+        List<XidInfo> reached = null;
+        for (XidInfo x : result.getForRetry()) {
+            if (x.getAttempts() >= maxAttempts) {
+                if (reached == null) {
+                    reached = new ArrayList<>();
+                }
+                reached.add(x);
+            }
+        }
+        if (reached != null) {
+            throw new RuntimeException(
+                String.format(
+                    "reached max number of commit attempts (%d) for transactions: %s",
+                    maxAttempts, reached));
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
new file mode 100644
index 00000000..a8017505
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.security.SecureRandom;
+
+/**
+ * {@link Xid} generator.
+ */
+public interface XidGenerator
+    extends Serializable, AutoCloseable {
+
+    Xid generateXid(SeaTunnelContext context, SinkWriter.Context sinkContext, long checkpointId);
+
+    default void open() {}
+
+    /**
+     * @return true if the provided transaction belongs to this subtask
+     */
+    boolean belongsToSubtask(Xid xid, SeaTunnelContext context, SinkWriter.Context sinkContext);
+
+    @Override
+    default void close() {}
+
+    /**
+     * Creates a {@link XidGenerator} that generates {@link Xid xids} from:
+     *
+     * <ol>
+     *   <li>job id
+     *   <li>subtask index
+     *   <li>checkpoint id
+     *   <li>four random bytes generated using {@link SecureRandom})
+     * </ol>
+     *
+     * <p>Each created {@link XidGenerator} instance MUST be used for only one Sink instance
+     * (otherwise Xids could collide).
+     */
+    static XidGenerator semanticXidGenerator() {
+        return new SemanticXidGenerator();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java
new file mode 100644
index 00000000..f5b1f057
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XidImpl.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A simple {@link Xid} implementation that stores branch and global transaction identifiers as byte
+ * arrays.
+ */
+final class XidImpl implements Xid, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final char[] HEX_CHARS = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+    };
+
+    private final int formatId;
+    private final byte[] globalTransactionId;
+    private final byte[] branchQualifier;
+
+    public XidImpl(int formatId, byte[] globalTransactionId, byte[] branchQualifier) {
+        checkArgument(globalTransactionId.length <= Xid.MAXGTRIDSIZE);
+        checkArgument(branchQualifier.length <= Xid.MAXBQUALSIZE);
+        this.formatId = formatId;
+        this.globalTransactionId = Arrays.copyOf(globalTransactionId, globalTransactionId.length);
+        this.branchQualifier = Arrays.copyOf(branchQualifier, branchQualifier.length);
+    }
+
+    @Override
+    public int getFormatId() {
+        return formatId;
+    }
+
+    @Override
+    public byte[] getGlobalTransactionId() {
+        return globalTransactionId;
+    }
+
+    @Override
+    public byte[] getBranchQualifier() {
+        return branchQualifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof XidImpl)) {
+            return false;
+        }
+        XidImpl xid = (XidImpl) o;
+        return formatId == xid.formatId
+            && Arrays.equals(globalTransactionId, xid.globalTransactionId)
+            && Arrays.equals(branchQualifier, xid.branchQualifier);
+    }
+
+    @Override
+    public int hashCode() {
+        final int number = 31;
+        int result = Objects.hash(formatId);
+        result = number * result + Arrays.hashCode(globalTransactionId);
+        result = number * result + Arrays.hashCode(branchQualifier);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return formatId
+            + ":"
+            + byteToHexString(globalTransactionId)
+            + ":"
+            + byteToHexString(branchQualifier);
+    }
+
+    /**
+     * Given an array of bytes it will convert the bytes to a hex string representation of the
+     * bytes.
+     *
+     * @param bytes the bytes to convert in a hex string
+     * @param start start index, inclusively
+     * @param end end index, exclusively
+     * @return hex string representation of the byte array
+     */
+    public static String byteToHexString(final byte[] bytes, final int start, final int end) {
+        final int number0xf0 = 0xF0;
+        final int number0x0f = 0x0F;
+        final int number4 = 4;
+        if (bytes == null) {
+            throw new IllegalArgumentException("bytes == null");
+        }
+
+        int length = end - start;
+        char[] out = new char[length * 2];
+
+        for (int i = start, j = 0; i < end; i++) {
+            out[j++] = HEX_CHARS[(number0xf0 & bytes[i]) >>> number4];
+            out[j++] = HEX_CHARS[number0x0f & bytes[i]];
+        }
+
+        return new String(out);
+    }
+
+    /**
+     * Given an array of bytes it will convert the bytes to a hex string representation of the
+     * bytes.
+     *
+     * @param bytes the bytes to convert in a hex string
+     * @return hex string representation of the byte array
+     */
+    public static String byteToHexString(final byte[] bytes) {
+        return byteToHexString(bytes, 0, bytes.length);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
new file mode 100644
index 00000000..849ca50c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XidGenerator;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.transaction.xa.Xid;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcExactlyOnceSinkWriter
+    implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcExactlyOnceSinkWriter.class);
+
+    private final SinkWriter.Context sinkcontext;
+
+    private final SeaTunnelContext context;
+
+    private final List<JdbcSinkState> recoverStates;
+
+    private final XaFacade xaFacade;
+
+    private final XaGroupOps xaGroupOps;
+
+    private final XidGenerator xidGenerator;
+
+    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+
+    private transient boolean isOpen;
+
+    private transient Xid currentXid;
+    private transient Xid prepareXid;
+
+    public JdbcExactlyOnceSinkWriter(
+        SinkWriter.Context sinkcontext,
+        SeaTunnelContext context,
+        JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
+        JdbcConnectorOptions jdbcConnectorOptions,
+        List<JdbcSinkState> states) {
+        checkArgument(
+            jdbcConnectorOptions.getMaxRetries() == 0,
+            "JDBC XA sink requires maxRetries equal to 0, otherwise it could "
+                + "cause duplicates.");
+
+        this.context = context;
+        this.sinkcontext = sinkcontext;
+        this.recoverStates = states;
+        this.xidGenerator = XidGenerator.semanticXidGenerator();
+        this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+            jdbcConnectorOptions);
+
+        this.outputFormat = new JdbcOutputFormat<>(
+            xaFacade,
+            jdbcConnectorOptions,
+            () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder));
+
+        this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+    }
+
+    private void tryOpen() throws IOException {
+        if (!isOpen) {
+            isOpen = true;
+            try {
+                xidGenerator.open();
+                xaFacade.open();
+                outputFormat.open();
+                if (!recoverStates.isEmpty()) {
+                    Xid xid = recoverStates.get(0).getXid();
+                    // Rollback pending transactions that should not include recoverStates
+                    xaGroupOps.recoverAndRollback(context, sinkcontext, xidGenerator, xid);
+                }
+                beginTx();
+            } catch (Exception e) {
+                ExceptionUtils.rethrowIOException(e);
+            }
+        }
+    }
+
+    @Override
+    public List<JdbcSinkState> snapshotState(long checkpointId) {
+        checkState(prepareXid != null, "prepare xid must not be null");
+        return Collections.singletonList(new JdbcSinkState(prepareXid));
+    }
+
+    @Override
+    public void write(SeaTunnelRow element)
+        throws IOException {
+        tryOpen();
+        checkState(currentXid != null, "current xid must not be null");
+        SeaTunnelRow copy = SerializationUtils.clone(element);
+        outputFormat.writeRecord(copy);
+    }
+
+    @Override
+    public Optional<XidInfo> prepareCommit()
+        throws IOException {
+        prepareCurrentTx();
+        this.currentXid = null;
+        beginTx();
+        checkState(prepareXid != null, "prepare xid must not be null");
+        return Optional.of(new XidInfo(prepareXid, 0));
+    }
+
+    @Override
+    public void abortPrepare() {
+
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+        if (currentXid != null && xaFacade.isOpen()) {
+            try {
+                LOG.debug("remove current transaction before closing, xid={}", currentXid);
+                xaFacade.failAndRollback(currentXid);
+            } catch (Exception e) {
+                LOG.warn("unable to fail/rollback current transaction, xid={}", currentXid, e);
+            }
+        }
+        try {
+            xaFacade.close();
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+        xidGenerator.close();
+        currentXid = null;
+        prepareXid = null;
+    }
+
+    private void beginTx() throws IOException {
+        checkState(currentXid == null, "currentXid not null");
+        currentXid = xidGenerator.generateXid(context, sinkcontext, System.currentTimeMillis());
+        try {
+            xaFacade.start(currentXid);
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    private void prepareCurrentTx() throws IOException {
+        checkState(currentXid != null, "no current xid");
+        outputFormat.flush();
+        try {
+            xaFacade.endAndPrepare(currentXid);
+            prepareXid = currentXid;
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
new file mode 100644
index 00000000..bd37cde4
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+@AutoService(SeaTunnelSink.class)
+public class JdbcSink
+    implements SeaTunnelSink<SeaTunnelRow, JdbcSinkState, XidInfo, JdbcAggregatedCommitInfo> {
+
+    private Config pluginConfig;
+
+    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
+
+    private SeaTunnelContext seaTunnelContext;
+
+    private JdbcConnectorOptions jdbcConnectorOptions;
+
+    @Override
+    public String getPluginName() {
+        return "jdbc";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig)
+        throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+        this.jdbcConnectorOptions = new JdbcConnectorOptions(this.pluginConfig);
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> createWriter(SinkWriter.Context context)
+        throws IOException {
+        SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> sinkWriter;
+        // TODO SeatunnelTyoeInfo is not good enough to get typesArray
+        JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row);
+        if (jdbcConnectorOptions.isExactlyOnce()) {
+            sinkWriter = new JdbcExactlyOnceSinkWriter(
+                context,
+                seaTunnelContext,
+                statementBuilder,
+                jdbcConnectorOptions,
+                new ArrayList<>()
+            );
+        } else {
+            sinkWriter = new JdbcSinkWriter(
+                context,
+                statementBuilder,
+                jdbcConnectorOptions);
+        }
+
+        return sinkWriter;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> restoreWriter(SinkWriter.Context context, List<JdbcSinkState> states)
+        throws IOException {
+        if (jdbcConnectorOptions.isExactlyOnce()) {
+            JdbcStatementBuilder<SeaTunnelRow> statementBuilder = (st, row) -> JdbcUtils.setRecordToStatement(st, null, row);
+            return new JdbcExactlyOnceSinkWriter(
+                context,
+                seaTunnelContext,
+                statementBuilder,
+                jdbcConnectorOptions,
+                states
+            );
+        }
+        return SeaTunnelSink.super.restoreWriter(context, states);
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo>> createAggregatedCommitter()
+        throws IOException {
+        if (jdbcConnectorOptions.isExactlyOnce()) {
+            return Optional.of(new JdbcSinkAggregatedCommitter(jdbcConnectorOptions));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowTypeInfo seaTunnelRowTypeInfo) {
+        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return seaTunnelContext;
+    }
+
+    @Override
+    public Optional<Serializer<JdbcAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
new file mode 100644
index 00000000..e5c9f308
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.GroupXaOperationResult;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JdbcSinkAggregatedCommitter
+    implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
+
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final JdbcConnectorOptions jdbcConnectorOptions;
+
+    public JdbcSinkAggregatedCommitter(
+        JdbcConnectorOptions jdbcConnectorOptions
+    ) {
+        this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+            jdbcConnectorOptions);
+        this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+        this.jdbcConnectorOptions = jdbcConnectorOptions;
+    }
+
+    private void tryOpen() throws IOException {
+        if (!xaFacade.isOpen()) {
+            try {
+                xaFacade.open();
+            } catch (Exception e) {
+                ExceptionUtils.rethrowIOException(e);
+            }
+        }
+    }
+
+    @Override
+    public List<JdbcAggregatedCommitInfo> commit(List<JdbcAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
+        tryOpen();
+        return aggregatedCommitInfos.stream().map(aggregatedCommitInfo -> {
+            GroupXaOperationResult<XidInfo> result = xaGroupOps.commit(aggregatedCommitInfo.getXidInfoList(), false, jdbcConnectorOptions.getMaxCommitAttempts());
+            return new JdbcAggregatedCommitInfo(result.getForRetry());
+        }).filter(ainfo -> !ainfo.getXidInfoList().isEmpty()).collect(Collectors.toList());
+    }
+
+    @Override
+    public JdbcAggregatedCommitInfo combine(List<XidInfo> commitInfos) {
+        return new JdbcAggregatedCommitInfo(commitInfos);
+    }
+
+    @Override
+    public void abort(List<JdbcAggregatedCommitInfo> aggregatedCommitInfo) throws IOException {
+        tryOpen();
+        for (JdbcAggregatedCommitInfo commitInfos : aggregatedCommitInfo) {
+            xaGroupOps.rollback(commitInfos.getXidInfoList());
+        }
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+        try {
+            xaFacade.close();
+        } catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
new file mode 100644
index 00000000..626f379a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkCommitter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaFacade;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOps;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.xa.XaGroupOpsImpl;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+public class JdbcSinkCommitter
+    implements SinkCommitter<XidInfo> {
+    private final XaFacade xaFacade;
+    private final XaGroupOps xaGroupOps;
+    private final JdbcConnectorOptions jdbcConnectorOptions;
+
+    public JdbcSinkCommitter(
+        JdbcConnectorOptions jdbcConnectorOptions
+    )
+        throws IOException {
+        this.jdbcConnectorOptions = jdbcConnectorOptions;
+        this.xaFacade = XaFacade.fromJdbcConnectionOptions(
+            jdbcConnectorOptions);
+        this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
+        try {
+            xaFacade.open();
+        }
+        catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+
+    @Override
+    public List<XidInfo> commit(List<XidInfo> committables) {
+        return xaGroupOps
+            .commit(committables, false, jdbcConnectorOptions.getMaxCommitAttempts())
+            .getForRetry();
+    }
+
+    @Override
+    public void abort(List<XidInfo> commitInfos)
+        throws IOException {
+        try {
+            xaGroupOps.rollback(commitInfos);
+        }
+        catch (Exception e) {
+            ExceptionUtils.rethrowIOException(e);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
new file mode 100644
index 00000000..548f1073
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcStatementBuilder;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectorOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+public class JdbcSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
+
+    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
+    private final SinkWriter.Context context;
+    private transient boolean isOpen;
+
+    public JdbcSinkWriter(
+        SinkWriter.Context context,
+        JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
+        JdbcConnectorOptions jdbcConnectorOptions) {
+
+        JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(jdbcConnectorOptions);
+
+        this.context = context;
+        this.outputFormat = new JdbcOutputFormat<>(
+            connectionProvider,
+            jdbcConnectorOptions,
+            () -> new SimpleBatchStatementExecutor<>(jdbcConnectorOptions.getQuery(), statementBuilder));
+    }
+
+    private void tryOpen() throws IOException {
+        if (!isOpen) {
+            isOpen = true;
+            outputFormat.open();
+        }
+    }
+
+    @Override
+    public List<JdbcSinkState> snapshotState(long checkpointId) {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void write(SeaTunnelRow element)
+        throws IOException {
+        tryOpen();
+        SeaTunnelRow copy = SerializationUtils.clone(element);
+        outputFormat.writeRecord(copy);
+    }
+
+    @Override
+    public Optional<XidInfo> prepareCommit()
+        throws IOException {
+        outputFormat.flush();
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+        outputFormat.close();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java
new file mode 100644
index 00000000..e04a06dc
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcAggregatedCommitInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class JdbcAggregatedCommitInfo implements Serializable {
+    private final List<XidInfo> xidInfoList;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java
new file mode 100644
index 00000000..d3261e8c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/JdbcSinkState.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class JdbcSinkState implements Serializable {
+    private final Xid xid;
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java
new file mode 100644
index 00000000..80cbe63f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/state/XidInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import javax.transaction.xa.Xid;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class XidInfo implements Serializable {
+
+    final Xid xid;
+    final int attempts;
+
+    public XidInfo withAttemptsIncremented() {
+        return new XidInfo(xid, attempts + 1);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java
new file mode 100644
index 00000000..f1d66d82
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ExceptionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import java.io.IOException;
+
+public class ExceptionUtils {
+    public static void rethrow(Throwable t) {
+        if (t instanceof Error) {
+            throw (Error) t;
+        } else if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        } else {
+            throw new RuntimeException(t);
+        }
+    }
+
+    /**
+     * Re-throws the given {@code Throwable} in scenarios where the signatures allows only
+     * IOExceptions (and RuntimeException and Error).
+     *
+     * <p>Throws this exception directly, if it is an IOException, a RuntimeException, or an Error.
+     * Otherwise it wraps it in an IOException and throws it.
+     *
+     * @param t The Throwable to be thrown.
+     */
+    public static void rethrowIOException(Throwable t) throws IOException {
+        if (t instanceof IOException) {
+            throw (IOException) t;
+        } else if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        } else if (t instanceof Error) {
+            throw (Error) t;
+        } else {
+            throw new IOException(t.getMessage(), t);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
new file mode 100644
index 00000000..b3b32f5c
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+/** Utils for jdbc connectors. */
+public class JdbcUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class);
+
+    /**
+     * Adds a record to the prepared statement.
+     *
+     * <p>When this method is called, the output format is guaranteed to be opened.
+     *
+     * <p>WARNING: this may fail when no column types specified (because a best effort approach is
+     * attempted in order to insert a null value but it's not guaranteed that the JDBC driver
+     * handles PreparedStatement.setObject(pos, null))
+     *
+     * @param upload The prepared statement.
+     * @param typesArray The jdbc types of the row.
+     * @param row The records to add to the output.
+     * @see PreparedStatement
+     */
+    public static void setRecordToStatement(PreparedStatement upload, int[] typesArray, SeaTunnelRow row)
+            throws SQLException {
+        if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getFields().length) {
+            LOG.warn(
+                    "Column SQL types array doesn't match arity of passed Row! Check the passed array...");
+        }
+        if (typesArray == null) {
+            // no types provided
+            for (int index = 0; index < row.getFields().length; index++) {
+                upload.setObject(index + 1, row.getFields()[index]);
+            }
+        } else {
+            // types provided
+            for (int i = 0; i < row.getFields().length; i++) {
+                setField(upload, typesArray[i], row.getFields()[i], i);
+            }
+        }
+    }
+
+    public static void setField(PreparedStatement upload, int type, Object field, int index)
+            throws SQLException {
+        if (field == null) {
+            upload.setNull(index + 1, type);
+        } else {
+            try {
+                // casting values as suggested by
+                // http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html
+                switch (type) {
+                    case java.sql.Types.NULL:
+                        upload.setNull(index + 1, type);
+                        break;
+                    case java.sql.Types.BOOLEAN:
+                    case java.sql.Types.BIT:
+                        upload.setBoolean(index + 1, (boolean) field);
+                        break;
+                    case java.sql.Types.CHAR:
+                    case java.sql.Types.NCHAR:
+                    case java.sql.Types.VARCHAR:
+                    case java.sql.Types.LONGVARCHAR:
+                    case java.sql.Types.LONGNVARCHAR:
+                        upload.setString(index + 1, (String) field);
+                        break;
+                    case java.sql.Types.TINYINT:
+                        upload.setByte(index + 1, (byte) field);
+                        break;
+                    case java.sql.Types.SMALLINT:
+                        upload.setShort(index + 1, (short) field);
+                        break;
+                    case java.sql.Types.INTEGER:
+                        upload.setInt(index + 1, (int) field);
+                        break;
+                    case java.sql.Types.BIGINT:
+                        upload.setLong(index + 1, (long) field);
+                        break;
+                    case java.sql.Types.REAL:
+                        upload.setFloat(index + 1, (float) field);
+                        break;
+                    case java.sql.Types.FLOAT:
+                    case java.sql.Types.DOUBLE:
+                        upload.setDouble(index + 1, (double) field);
+                        break;
+                    case java.sql.Types.DECIMAL:
+                    case java.sql.Types.NUMERIC:
+                        upload.setBigDecimal(index + 1, (java.math.BigDecimal) field);
+                        break;
+                    case java.sql.Types.DATE:
+                        upload.setDate(index + 1, (java.sql.Date) field);
+                        break;
+                    case java.sql.Types.TIME:
+                        upload.setTime(index + 1, (java.sql.Time) field);
+                        break;
+                    case java.sql.Types.TIMESTAMP:
+                        upload.setTimestamp(index + 1, (java.sql.Timestamp) field);
+                        break;
+                    case java.sql.Types.BINARY:
+                    case java.sql.Types.VARBINARY:
+                    case java.sql.Types.LONGVARBINARY:
+                        upload.setBytes(index + 1, (byte[]) field);
+                        break;
+                    default:
+                        upload.setObject(index + 1, field);
+                        LOG.warn(
+                                "Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.",
+                                type,
+                                index + 1,
+                                field);
+                        // case java.sql.Types.SQLXML
+                        // case java.sql.Types.ARRAY:
+                        // case java.sql.Types.JAVA_OBJECT:
+                        // case java.sql.Types.BLOB:
+                        // case java.sql.Types.CLOB:
+                        // case java.sql.Types.NCLOB:
+                        // case java.sql.Types.DATALINK:
+                        // case java.sql.Types.DISTINCT:
+                        // case java.sql.Types.OTHER:
+                        // case java.sql.Types.REF:
+                        // case java.sql.Types.ROWID:
+                        // case java.sql.Types.STRUC
+                }
+            } catch (ClassCastException e) {
+                // enrich the exception with detailed information.
+                String errorMessage =
+                        String.format(
+                                "%s, field index: %s, field value: %s.",
+                                e.getMessage(), index, field);
+                ClassCastException enrichedException = new ClassCastException(errorMessage);
+                enrichedException.setStackTrace(e.getStackTrace());
+                throw enrichedException;
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java
new file mode 100644
index 00000000..8119da4d
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/ThrowingRunnable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+
+/**
+ * Similar to a {@link Runnable}, this interface is used to capture a block of code to be executed.
+ * In contrast to {@code Runnable}, this interface allows throwing checked exceptions.
+ */
+
+@FunctionalInterface
+public interface ThrowingRunnable<E extends Throwable> {
+
+    /**
+     * The work method.
+     *
+     * @throws E Exceptions may be thrown.
+     */
+    void run() throws E;
+
+    /**
+     * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked
+     * exceptions as unchecked.
+     *
+     * @param throwingRunnable to convert into a {@link Runnable}
+     * @return {@link Runnable} which throws all checked exceptions as unchecked.
+     */
+    static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
+        return () -> {
+            try {
+                throwingRunnable.run();
+            } catch (Throwable t) {
+                ExceptionUtils.rethrow(t);
+            }
+        };
+    }
+}