You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/01/14 02:46:53 UTC

[incubator-seatunnel] branch dev updated: [Bug][CDC] Fix jdbc sink generate update sql (#3940)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 233465d4e [Bug][CDC] Fix jdbc sink generate update sql (#3940)
233465d4e is described below

commit 233465d4e4fd7859d05cfd8e27029d440f6d136f
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Jan 14 10:46:47 2023 +0800

    [Bug][CDC] Fix jdbc sink generate update sql (#3940)
---
 .../jdbc/internal/JdbcOutputFormatBuilder.java     |  34 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |  10 +-
 .../internal/dialect/oracle/OracleDialect.java     |   2 +-
 .../dialect/sqlserver/SqlServerDialect.java        |   2 +-
 .../executor/FieldNamedPreparedStatement.java      | 668 +++++++++++++++++++++
 .../seatunnel/jdbc/JdbcSinkCDCChangelogIT.java     | 133 ++++
 .../test/resources/jdbc_sink_cdc_changelog.conf    |  71 +++
 7 files changed, 906 insertions(+), 14 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 18d75bfea..56d725dc4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
@@ -33,12 +34,14 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBa
 import com.google.common.base.Strings;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.IntFunction;
 
+@Slf4j
 @RequiredArgsConstructor
 public class JdbcOutputFormatBuilder {
     @NonNull
@@ -132,8 +135,14 @@ public class JdbcOutputFormatBuilder {
                                                                                          String[] pkNames) {
 
         return new InsertOrUpdateBatchStatementExecutor(
-            connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
-            connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+                rowType.getFieldNames()),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
+                rowType.getFieldNames()),
             rowType,
             dialect.getRowConverter());
     }
@@ -144,12 +153,20 @@ public class JdbcOutputFormatBuilder {
                                                                                                 String[] pkNames,
                                                                                                 SeaTunnelDataType[] pkTypes,
                                                                                                 Function<SeaTunnelRow, SeaTunnelRow> keyExtractor) {
-
         SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
         return new InsertOrUpdateBatchStatementExecutor(
-            connection -> connection.prepareStatement(dialect.getRowExistsStatement(table, pkNames)),
-            connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
-            connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                dialect.getRowExistsStatement(table, pkNames),
+                pkNames),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+                rowType.getFieldNames()),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
+                rowType.getFieldNames()),
             keyRowType,
             keyExtractor,
             rowType,
@@ -176,7 +193,10 @@ public class JdbcOutputFormatBuilder {
                                                                                  SeaTunnelRowType rowType,
                                                                                  JdbcRowConverter rowConverter) {
         return new SimpleBatchStatementExecutor(
-            connection -> connection.prepareStatement(sql),
+            connection -> FieldNamedPreparedStatement.prepareStatement(
+                connection,
+                sql,
+                rowType.getFieldNames()),
             rowType,
             rowConverter);
     }
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index d04fa18d4..2d52dbe2a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -85,7 +85,7 @@ public interface JdbcDialect extends Serializable {
             .map(this::quoteIdentifier)
             .collect(Collectors.joining(", "));
         String placeholders = Arrays.stream(fieldNames)
-            .map(fieldName -> "?")
+            .map(fieldName -> ":" + fieldName)
             .collect(Collectors.joining(", "));
         return String.format("INSERT INTO %s (%s) VALUES (%s)",
             quoteIdentifier(tableName), columns, placeholders);
@@ -104,10 +104,10 @@ public interface JdbcDialect extends Serializable {
      */
     default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
         String setClause = Arrays.stream(fieldNames)
-            .map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
+            .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
             .collect(Collectors.joining(", "));
         String conditionClause = Arrays.stream(conditionFields)
-            .map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
+            .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
             .collect(Collectors.joining(" AND "));
         return String.format("UPDATE %s SET %s WHERE %s",
             quoteIdentifier(tableName), setClause, conditionClause);
@@ -126,7 +126,7 @@ public interface JdbcDialect extends Serializable {
      */
     default String getDeleteStatement(String tableName, String[] conditionFields) {
         String conditionClause = Arrays.stream(conditionFields)
-            .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+            .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
             .collect(Collectors.joining(" AND "));
         return String.format("DELETE FROM %s WHERE %s",
             quoteIdentifier(tableName), conditionClause);
@@ -144,7 +144,7 @@ public interface JdbcDialect extends Serializable {
      */
     default String getRowExistsStatement(String tableName, String[] conditionFields) {
         String fieldExpressions = Arrays.stream(conditionFields)
-            .map(field -> format("%s = ?", quoteIdentifier(field)))
+            .map(field -> format("%s = :%s", quoteIdentifier(field), field))
             .collect(Collectors.joining(" AND "));
         return String.format("SELECT 1 FROM %s WHERE %s",
             quoteIdentifier(tableName), fieldExpressions);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index f3fe59ab6..cadf4bd16 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -55,7 +55,7 @@ public class OracleDialect implements JdbcDialect {
             .filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
             .collect(Collectors.toList());
         String valuesBinding = Arrays.stream(fieldNames)
-            .map(fieldName -> "? " + quoteIdentifier(fieldName))
+            .map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
             .collect(Collectors.joining(", "));
 
         String usingClause = String.format("SELECT %s FROM DUAL", valuesBinding);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 314cb1759..0ac734dc0 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -48,7 +48,7 @@ public class SqlServerDialect implements JdbcDialect {
             .filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
             .collect(Collectors.toList());
         String valuesBinding = Arrays.stream(fieldNames)
-            .map(fieldName -> "? " + quoteIdentifier(fieldName))
+            .map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
             .collect(Collectors.joining(", "));
 
         String usingClause = String.format("SELECT %s", valuesBinding);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
new file mode 100644
index 000000000..c4d5ca3c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import lombok.RequiredArgsConstructor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class FieldNamedPreparedStatement implements PreparedStatement {
+    private final PreparedStatement statement;
+    private final int[][] indexMapping;
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setNull(index, sqlType);
+        }
+    }
+
+    @Override
+    public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setBoolean(index, x);
+        }
+    }
+
+    @Override
+    public void setByte(int parameterIndex, byte x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setByte(index, x);
+        }
+    }
+
+    @Override
+    public void setShort(int parameterIndex, short x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setShort(index, x);
+        }
+    }
+
+    @Override
+    public void setInt(int parameterIndex, int x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setInt(index, x);
+        }
+    }
+
+    @Override
+    public void setLong(int parameterIndex, long x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setLong(index, x);
+        }
+    }
+
+    @Override
+    public void setFloat(int parameterIndex, float x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setFloat(index, x);
+        }
+    }
+
+    @Override
+    public void setDouble(int parameterIndex, double x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setDouble(index, x);
+        }
+    }
+
+    @Override
+    public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setBigDecimal(index, x);
+        }
+    }
+
+    @Override
+    public void setString(int parameterIndex, String x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setString(index, x);
+        }
+    }
+
+    @Override
+    public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setBytes(index, x);
+        }
+    }
+
+    @Override
+    public void setDate(int parameterIndex, Date x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setDate(index, x);
+        }
+    }
+
+    @Override
+    public void setTime(int parameterIndex, Time x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setTime(index, x);
+        }
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setTimestamp(index, x);
+        }
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setObject(index, x, targetSqlType);
+        }
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setObject(index, x);
+        }
+    }
+
+    @Override
+    public void setRef(int parameterIndex, Ref x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setRef(index, x);
+        }
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, Blob x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setBlob(index, x);
+        }
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Clob x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setClob(index, x);
+        }
+    }
+
+    @Override
+    public void setArray(int parameterIndex, Array x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setArray(index, x);
+        }
+    }
+
+    @Override
+    public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setDate(index, x, cal);
+        }
+    }
+
+    @Override
+    public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setTime(index, x, cal);
+        }
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setTimestamp(index, x, cal);
+        }
+    }
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setNull(index, sqlType, typeName);
+        }
+    }
+
+    @Override
+    public void setURL(int parameterIndex, URL x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setURL(index, x);
+        }
+    }
+
+    @Override
+    public void setRowId(int parameterIndex, RowId x) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setRowId(index, x);
+        }
+    }
+
+    @Override
+    public void setNString(int parameterIndex, String value) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setNString(index, value);
+        }
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, NClob value) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setNClob(index, value);
+        }
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setSQLXML(index, xmlObject);
+        }
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+        for (int index : indexMapping[parameterIndex - 1]) {
+            statement.setObject(index, x, targetSqlType, scaleOrLength);
+        }
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        return statement.execute();
+    }
+
+    @Override
+    public void addBatch() throws SQLException {
+        statement.addBatch();
+    }
+
+    @Override
+    public ResultSet executeQuery() throws SQLException {
+        return statement.executeQuery();
+    }
+
+    @Override
+    public int executeUpdate() throws SQLException {
+        return statement.executeUpdate();
+    }
+
+    @Override
+    public void clearParameters() throws SQLException {
+        statement.clearParameters();
+    }
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        return statement.getMetaData();
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() throws SQLException {
+        return statement.getParameterMetaData();
+    }
+
+    @Override
+    public ResultSet executeQuery(String sql) throws SQLException {
+        return statement.executeQuery(sql);
+    }
+
+    @Override
+    public int executeUpdate(String sql) throws SQLException {
+        return statement.executeUpdate(sql);
+    }
+
+    @Override
+    public void close() throws SQLException {
+        statement.close();
+    }
+
+    @Override
+    public int getMaxFieldSize() throws SQLException {
+        return statement.getMaxFieldSize();
+    }
+
+    @Override
+    public void setMaxFieldSize(int max) throws SQLException {
+        statement.setMaxFieldSize(max);
+    }
+
+    @Override
+    public int getMaxRows() throws SQLException {
+        return statement.getMaxRows();
+    }
+
+    @Override
+    public void setMaxRows(int max) throws SQLException {
+        statement.setMaxRows(max);
+    }
+
+    @Override
+    public void setEscapeProcessing(boolean enable) throws SQLException {
+        statement.setEscapeProcessing(enable);
+    }
+
+    @Override
+    public int getQueryTimeout() throws SQLException {
+        return statement.getQueryTimeout();
+    }
+
+    @Override
+    public void setQueryTimeout(int seconds) throws SQLException {
+        statement.setQueryTimeout(seconds);
+    }
+
+    @Override
+    public void cancel() throws SQLException {
+        statement.cancel();
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return statement.getWarnings();
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+        statement.clearWarnings();
+    }
+
+    @Override
+    public void setCursorName(String name) throws SQLException {
+        statement.setCursorName(name);
+    }
+
+    @Override
+    public boolean execute(String sql) throws SQLException {
+        return statement.execute(sql);
+    }
+
+    @Override
+    public ResultSet getResultSet() throws SQLException {
+        return statement.getResultSet();
+    }
+
+    @Override
+    public int getUpdateCount() throws SQLException {
+        return statement.getUpdateCount();
+    }
+
+    @Override
+    public boolean getMoreResults() throws SQLException {
+        return statement.getMoreResults();
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {
+        statement.setFetchDirection(direction);
+    }
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        return statement.getFetchDirection();
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {
+        statement.setFetchSize(rows);
+    }
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        return statement.getFetchSize();
+    }
+
+    @Override
+    public int getResultSetConcurrency() throws SQLException {
+        return statement.getResultSetConcurrency();
+    }
+
+    @Override
+    public int getResultSetType() throws SQLException {
+        return statement.getResultSetType();
+    }
+
+    @Override
+    public void addBatch(String sql) throws SQLException {
+        statement.addBatch(sql);
+    }
+
+    @Override
+    public void clearBatch() throws SQLException {
+        statement.clearBatch();
+    }
+
+    @Override
+    public int[] executeBatch() throws SQLException {
+        return statement.executeBatch();
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+        return statement.getConnection();
+    }
+
+    @Override
+    public boolean getMoreResults(int current) throws SQLException {
+        return statement.getMoreResults(current);
+    }
+
+    @Override
+    public ResultSet getGeneratedKeys() throws SQLException {
+        return statement.getGeneratedKeys();
+    }
+
+    @Override
+    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        return statement.executeUpdate(sql, autoGeneratedKeys);
+    }
+
+    @Override
+    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        return statement.executeUpdate(sql, columnIndexes);
+    }
+
+    @Override
+    public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+        return statement.executeUpdate(sql, columnNames);
+    }
+
+    @Override
+    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+        return statement.execute(sql, autoGeneratedKeys);
+    }
+
+    @Override
+    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+        return statement.execute(sql, columnIndexes);
+    }
+
+    @Override
+    public boolean execute(String sql, String[] columnNames) throws SQLException {
+        return statement.execute(sql, columnNames);
+    }
+
+    @Override
+    public int getResultSetHoldability() throws SQLException {
+        return statement.getResultSetHoldability();
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return statement.isClosed();
+    }
+
+    @Override
+    public void setPoolable(boolean poolable) throws SQLException {
+        statement.setPoolable(poolable);
+    }
+
+    @Override
+    public boolean isPoolable() throws SQLException {
+        return statement.isPoolable();
+    }
+
+    @Override
+    public void closeOnCompletion() throws SQLException {
+        statement.closeOnCompletion();
+    }
+
+    @Override
+    public boolean isCloseOnCompletion() throws SQLException {
+        return statement.isCloseOnCompletion();
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        return statement.unwrap(iface);
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return statement.isWrapperFor(iface);
+    }
+
+    public static FieldNamedPreparedStatement prepareStatement(
+        Connection connection, String sql, String[] fieldNames) throws SQLException {
+        checkNotNull(connection, "connection must not be null.");
+        checkNotNull(sql, "sql must not be null.");
+        checkNotNull(fieldNames, "fieldNames must not be null.");
+
+        int[][] indexMapping = new int[fieldNames.length][];
+        String parsedSQL;
+        if (sql.contains("?")) {
+            parsedSQL = sql;
+            for (int i = 0; i < fieldNames.length; i++) {
+                // SQL statement parameter index starts from 1
+                indexMapping[i] = new int[] {i + 1};
+            }
+        } else {
+            HashMap<String, List<Integer>> parameterMap = new HashMap<>();
+            parsedSQL = parseNamedStatement(sql, parameterMap);
+            // currently, the statements must contain all the field parameters
+            checkArgument(parameterMap.size() == fieldNames.length);
+            for (int i = 0; i < fieldNames.length; i++) {
+                String fieldName = fieldNames[i];
+                checkArgument(
+                    parameterMap.containsKey(fieldName),
+                    fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
+                indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
+            }
+        }
+        return new FieldNamedPreparedStatement(
+            connection.prepareStatement(parsedSQL), indexMapping);
+    }
+
+    public static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
+        StringBuilder parsedSql = new StringBuilder();
+        int fieldIndex = 1; // SQL statement parameter index starts from 1
+        int length = sql.length();
+        for (int i = 0; i < length; i++) {
+            char c = sql.charAt(i);
+            if (':' == c) {
+                int j = i + 1;
+                while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
+                    j++;
+                }
+                String parameterName = sql.substring(i + 1, j);
+                checkArgument(
+                    !parameterName.isEmpty(),
+                    "Named parameters in SQL statement must not be empty.");
+                paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
+                fieldIndex++;
+                i = j - 1;
+                parsedSql.append('?');
+            } else {
+                parsedSql.append(c);
+            }
+        }
+        return parsedSql.toString();
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
new file mode 100644
index 000000000..38e8fc4ae
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSinkCDCChangelogIT extends TestSuiteBase implements TestResource {
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+    private static final String PG_DRIVER_JAR = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+    private PostgreSQLContainer<?> postgreSQLContainer;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + PG_DRIVER_JAR);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    };
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        postgreSQLContainer = new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+            .withNetwork(NETWORK)
+            .withNetworkAliases("postgresql")
+            .withExposedPorts(5432)
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+        Startables.deepStart(Stream.of(postgreSQLContainer)).join();
+        log.info("PostgreSQL container started");
+        Class.forName(postgreSQLContainer.getDriverClassName());
+        given().ignoreExceptions()
+            .await()
+            .atLeast(100, TimeUnit.MILLISECONDS)
+            .pollInterval(500, TimeUnit.MILLISECONDS)
+            .atMost(2, TimeUnit.MINUTES)
+            .untilAsserted(() -> initializeJdbcTable());
+    }
+
+    @TestTemplate
+    public void testSinkCDCChangelog(TestContainer container) throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = container.executeJob("/jdbc_sink_cdc_changelog.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Set<List<Object>> actual = new HashSet<>();
+        try (Connection connection = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(),
+            postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword())) {
+            try (Statement statement = connection.createStatement()) {
+                ResultSet resultSet = statement.executeQuery("select * from sink");
+                while (resultSet.next()) {
+                    List<Object> row = Arrays.asList(
+                        resultSet.getLong("pk_id"),
+                        resultSet.getString("name"),
+                        resultSet.getInt("score"));
+                    actual.add(row);
+                }
+            }
+        }
+        Set<List<Object>> expected = Stream.<List<Object>>of(
+                Arrays.asList(1L, "A_1", 100),
+                Arrays.asList(3L, "C", 100))
+            .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(),
+            postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sink = "create table sink(\n" +
+                "pk_id BIGINT NOT NULL PRIMARY KEY,\n" +
+                "name varchar(255),\n" +
+                "score INT\n" +
+                ")";
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (postgreSQLContainer != null) {
+            postgreSQLContainer.stop();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
new file mode 100644
index 000000000..188c7bf5d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        schema = {
+            fields {
+                pk_id = bigint
+                name = string
+                score = int
+            }
+        }
+        rows = [
+            {
+                kind = INSERT
+                fields = [1, "A", 100]
+            },
+            {
+                kind = INSERT
+                fields = [2, "B", 100]
+            },
+            {
+                kind = INSERT
+                fields = [3, "C", 100]
+            },
+            {
+                kind = UPDATE_BEFORE
+                fields = [1, "A", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [1, "A_1", 100]
+            },
+            {
+                kind = DELETE
+                fields = [2, "B", 100]
+            }
+        ]
+    }
+}
+
+sink {
+    Jdbc {
+        driver = org.postgresql.Driver
+        url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+        user = test
+        password = test
+
+        table = sink
+        primary_keys = ["pk_id"]
+    }
+}
\ No newline at end of file