You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/01/14 02:46:53 UTC
[incubator-seatunnel] branch dev updated: [Bug][CDC] Fix jdbc sink generate update sql (#3940)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 233465d4e [Bug][CDC] Fix jdbc sink generate update sql (#3940)
233465d4e is described below
commit 233465d4e4fd7859d05cfd8e27029d440f6d136f
Author: hailin0 <wa...@apache.org>
AuthorDate: Sat Jan 14 10:46:47 2023 +0800
[Bug][CDC] Fix jdbc sink generate update sql (#3940)
---
.../jdbc/internal/JdbcOutputFormatBuilder.java | 34 +-
.../jdbc/internal/dialect/JdbcDialect.java | 10 +-
.../internal/dialect/oracle/OracleDialect.java | 2 +-
.../dialect/sqlserver/SqlServerDialect.java | 2 +-
.../executor/FieldNamedPreparedStatement.java | 668 +++++++++++++++++++++
.../seatunnel/jdbc/JdbcSinkCDCChangelogIT.java | 133 ++++
.../test/resources/jdbc_sink_cdc_changelog.conf | 71 +++
7 files changed, 906 insertions(+), 14 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
index 18d75bfea..56d725dc4 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcOutputFormatBuilder.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRow
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferReducedBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.InsertOrUpdateBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor;
@@ -33,12 +34,14 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBa
import com.google.common.base.Strings;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.IntFunction;
+@Slf4j
@RequiredArgsConstructor
public class JdbcOutputFormatBuilder {
@NonNull
@@ -132,8 +135,14 @@ public class JdbcOutputFormatBuilder {
String[] pkNames) {
return new InsertOrUpdateBatchStatementExecutor(
- connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
- connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+ rowType.getFieldNames()),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
+ rowType.getFieldNames()),
rowType,
dialect.getRowConverter());
}
@@ -144,12 +153,20 @@ public class JdbcOutputFormatBuilder {
String[] pkNames,
SeaTunnelDataType[] pkTypes,
Function<SeaTunnelRow, SeaTunnelRow> keyExtractor) {
-
SeaTunnelRowType keyRowType = new SeaTunnelRowType(pkNames, pkTypes);
return new InsertOrUpdateBatchStatementExecutor(
- connection -> connection.prepareStatement(dialect.getRowExistsStatement(table, pkNames)),
- connection -> connection.prepareStatement(dialect.getInsertIntoStatement(table, rowType.getFieldNames())),
- connection -> connection.prepareStatement(dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames)),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ dialect.getRowExistsStatement(table, pkNames),
+ pkNames),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ dialect.getInsertIntoStatement(table, rowType.getFieldNames()),
+ rowType.getFieldNames()),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ dialect.getUpdateStatement(table, rowType.getFieldNames(), pkNames),
+ rowType.getFieldNames()),
keyRowType,
keyExtractor,
rowType,
@@ -176,7 +193,10 @@ public class JdbcOutputFormatBuilder {
SeaTunnelRowType rowType,
JdbcRowConverter rowConverter) {
return new SimpleBatchStatementExecutor(
- connection -> connection.prepareStatement(sql),
+ connection -> FieldNamedPreparedStatement.prepareStatement(
+ connection,
+ sql,
+ rowType.getFieldNames()),
rowType,
rowConverter);
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index d04fa18d4..2d52dbe2a 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -85,7 +85,7 @@ public interface JdbcDialect extends Serializable {
.map(this::quoteIdentifier)
.collect(Collectors.joining(", "));
String placeholders = Arrays.stream(fieldNames)
- .map(fieldName -> "?")
+ .map(fieldName -> ":" + fieldName)
.collect(Collectors.joining(", "));
return String.format("INSERT INTO %s (%s) VALUES (%s)",
quoteIdentifier(tableName), columns, placeholders);
@@ -104,10 +104,10 @@ public interface JdbcDialect extends Serializable {
*/
default String getUpdateStatement(String tableName, String[] fieldNames, String[] conditionFields) {
String setClause = Arrays.stream(fieldNames)
- .map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
+ .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(", "));
String conditionClause = Arrays.stream(conditionFields)
- .map(fieldName -> String.format("%s = ?", quoteIdentifier(fieldName)))
+ .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(" AND "));
return String.format("UPDATE %s SET %s WHERE %s",
quoteIdentifier(tableName), setClause, conditionClause);
@@ -126,7 +126,7 @@ public interface JdbcDialect extends Serializable {
*/
default String getDeleteStatement(String tableName, String[] conditionFields) {
String conditionClause = Arrays.stream(conditionFields)
- .map(fieldName -> format("%s = ?", quoteIdentifier(fieldName)))
+ .map(fieldName -> format("%s = :%s", quoteIdentifier(fieldName), fieldName))
.collect(Collectors.joining(" AND "));
return String.format("DELETE FROM %s WHERE %s",
quoteIdentifier(tableName), conditionClause);
@@ -144,7 +144,7 @@ public interface JdbcDialect extends Serializable {
*/
default String getRowExistsStatement(String tableName, String[] conditionFields) {
String fieldExpressions = Arrays.stream(conditionFields)
- .map(field -> format("%s = ?", quoteIdentifier(field)))
+ .map(field -> format("%s = :%s", quoteIdentifier(field), field))
.collect(Collectors.joining(" AND "));
return String.format("SELECT 1 FROM %s WHERE %s",
quoteIdentifier(tableName), fieldExpressions);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index f3fe59ab6..cadf4bd16 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -55,7 +55,7 @@ public class OracleDialect implements JdbcDialect {
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding = Arrays.stream(fieldNames)
- .map(fieldName -> "? " + quoteIdentifier(fieldName))
+ .map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
String usingClause = String.format("SELECT %s FROM DUAL", valuesBinding);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
index 314cb1759..0ac734dc0 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java
@@ -48,7 +48,7 @@ public class SqlServerDialect implements JdbcDialect {
.filter(fieldName -> !Arrays.asList(uniqueKeyFields).contains(fieldName))
.collect(Collectors.toList());
String valuesBinding = Arrays.stream(fieldNames)
- .map(fieldName -> "? " + quoteIdentifier(fieldName))
+ .map(fieldName -> ":" + fieldName + " " + quoteIdentifier(fieldName))
.collect(Collectors.joining(", "));
String usingClause = String.format("SELECT %s", valuesBinding);
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
new file mode 100644
index 000000000..c4d5ca3c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import lombok.RequiredArgsConstructor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class FieldNamedPreparedStatement implements PreparedStatement {
+ private final PreparedStatement statement;
+ private final int[][] indexMapping;
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setNull(index, sqlType);
+ }
+ }
+
+ @Override
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setBoolean(index, x);
+ }
+ }
+
+ @Override
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setByte(index, x);
+ }
+ }
+
+ @Override
+ public void setShort(int parameterIndex, short x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setShort(index, x);
+ }
+ }
+
+ @Override
+ public void setInt(int parameterIndex, int x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setInt(index, x);
+ }
+ }
+
+ @Override
+ public void setLong(int parameterIndex, long x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setLong(index, x);
+ }
+ }
+
+ @Override
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setFloat(index, x);
+ }
+ }
+
+ @Override
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setDouble(index, x);
+ }
+ }
+
+ @Override
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setBigDecimal(index, x);
+ }
+ }
+
+ @Override
+ public void setString(int parameterIndex, String x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setString(index, x);
+ }
+ }
+
+ @Override
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setBytes(index, x);
+ }
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setDate(index, x);
+ }
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setTime(index, x);
+ }
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setTimestamp(index, x);
+ }
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setObject(index, x, targetSqlType);
+ }
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setObject(index, x);
+ }
+ }
+
+ @Override
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setRef(index, x);
+ }
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setBlob(index, x);
+ }
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setClob(index, x);
+ }
+ }
+
+ @Override
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setArray(index, x);
+ }
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setDate(index, x, cal);
+ }
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setTime(index, x, cal);
+ }
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setTimestamp(index, x, cal);
+ }
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setNull(index, sqlType, typeName);
+ }
+ }
+
+ @Override
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setURL(index, x);
+ }
+ }
+
+ @Override
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setRowId(index, x);
+ }
+ }
+
+ @Override
+ public void setNString(int parameterIndex, String value) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setNString(index, value);
+ }
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setNClob(index, value);
+ }
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setSQLXML(index, xmlObject);
+ }
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+ for (int index : indexMapping[parameterIndex - 1]) {
+ statement.setObject(index, x, targetSqlType, scaleOrLength);
+ }
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ return statement.execute();
+ }
+
+ @Override
+ public void addBatch() throws SQLException {
+ statement.addBatch();
+ }
+
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ return statement.executeQuery();
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ return statement.executeUpdate();
+ }
+
+ @Override
+ public void clearParameters() throws SQLException {
+ statement.clearParameters();
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ return statement.getMetaData();
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ return statement.getParameterMetaData();
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return statement.executeQuery(sql);
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ return statement.executeUpdate(sql);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ statement.close();
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ return statement.getMaxFieldSize();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ statement.setMaxFieldSize(max);
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ return statement.getMaxRows();
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ statement.setMaxRows(max);
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ statement.setEscapeProcessing(enable);
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return statement.getQueryTimeout();
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ statement.setQueryTimeout(seconds);
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ statement.cancel();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return statement.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ statement.clearWarnings();
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ statement.setCursorName(name);
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ return statement.execute(sql);
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ return statement.getResultSet();
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ return statement.getUpdateCount();
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return statement.getMoreResults();
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ statement.setFetchDirection(direction);
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return statement.getFetchDirection();
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ statement.setFetchSize(rows);
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return statement.getFetchSize();
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return statement.getResultSetConcurrency();
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return statement.getResultSetType();
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ statement.addBatch(sql);
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ statement.clearBatch();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ return statement.executeBatch();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return statement.getConnection();
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ return statement.getMoreResults(current);
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ return statement.getGeneratedKeys();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ return statement.executeUpdate(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ return statement.executeUpdate(sql, columnIndexes);
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ return statement.executeUpdate(sql, columnNames);
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ return statement.execute(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ return statement.execute(sql, columnIndexes);
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ return statement.execute(sql, columnNames);
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ return statement.getResultSetHoldability();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return statement.isClosed();
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ statement.setPoolable(poolable);
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ return statement.isPoolable();
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ statement.closeOnCompletion();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ return statement.isCloseOnCompletion();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return statement.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return statement.isWrapperFor(iface);
+ }
+
+ public static FieldNamedPreparedStatement prepareStatement(
+ Connection connection, String sql, String[] fieldNames) throws SQLException {
+ checkNotNull(connection, "connection must not be null.");
+ checkNotNull(sql, "sql must not be null.");
+ checkNotNull(fieldNames, "fieldNames must not be null.");
+
+ int[][] indexMapping = new int[fieldNames.length][];
+ String parsedSQL;
+ if (sql.contains("?")) {
+ parsedSQL = sql;
+ for (int i = 0; i < fieldNames.length; i++) {
+ // SQL statement parameter index starts from 1
+ indexMapping[i] = new int[] {i + 1};
+ }
+ } else {
+ HashMap<String, List<Integer>> parameterMap = new HashMap<>();
+ parsedSQL = parseNamedStatement(sql, parameterMap);
+ // currently, the statements must contain all the field parameters
+ checkArgument(parameterMap.size() == fieldNames.length);
+ for (int i = 0; i < fieldNames.length; i++) {
+ String fieldName = fieldNames[i];
+ checkArgument(
+ parameterMap.containsKey(fieldName),
+ fieldName + " doesn't exist in the parameters of SQL statement: " + sql);
+ indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray();
+ }
+ }
+ return new FieldNamedPreparedStatement(
+ connection.prepareStatement(parsedSQL), indexMapping);
+ }
+
+ public static String parseNamedStatement(String sql, Map<String, List<Integer>> paramMap) {
+ StringBuilder parsedSql = new StringBuilder();
+ int fieldIndex = 1; // SQL statement parameter index starts from 1
+ int length = sql.length();
+ for (int i = 0; i < length; i++) {
+ char c = sql.charAt(i);
+ if (':' == c) {
+ int j = i + 1;
+ while (j < length && Character.isJavaIdentifierPart(sql.charAt(j))) {
+ j++;
+ }
+ String parameterName = sql.substring(i + 1, j);
+ checkArgument(
+ !parameterName.isEmpty(),
+ "Named parameters in SQL statement must not be empty.");
+ paramMap.computeIfAbsent(parameterName, n -> new ArrayList<>()).add(fieldIndex);
+ fieldIndex++;
+ i = j - 1;
+ parsedSql.append('?');
+ } else {
+ parsedSql.append(c);
+ }
+ }
+ return parsedSql.toString();
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
new file mode 100644
index 000000000..38e8fc4ae
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSinkCDCChangelogIT extends TestSuiteBase implements TestResource {
+ private static final String PG_IMAGE = "postgres:alpine3.16";
+ private static final String PG_DRIVER_JAR = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+ private PostgreSQLContainer<?> postgreSQLContainer;
+
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + PG_DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode());
+ };
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ postgreSQLContainer = new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("postgresql")
+ .withExposedPorts(5432)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+ Startables.deepStart(Stream.of(postgreSQLContainer)).join();
+ log.info("PostgreSQL container started");
+ Class.forName(postgreSQLContainer.getDriverClassName());
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(() -> initializeJdbcTable());
+ }
+
+ @TestTemplate
+ public void testSinkCDCChangelog(TestContainer container) throws IOException, InterruptedException, SQLException {
+ Container.ExecResult execResult = container.executeJob("/jdbc_sink_cdc_changelog.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Set<List<Object>> actual = new HashSet<>();
+ try (Connection connection = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(),
+ postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword())) {
+ try (Statement statement = connection.createStatement()) {
+ ResultSet resultSet = statement.executeQuery("select * from sink");
+ while (resultSet.next()) {
+ List<Object> row = Arrays.asList(
+ resultSet.getLong("pk_id"),
+ resultSet.getString("name"),
+ resultSet.getInt("score"));
+ actual.add(row);
+ }
+ }
+ }
+ Set<List<Object>> expected = Stream.<List<Object>>of(
+ Arrays.asList(1L, "A_1", 100),
+ Arrays.asList(3L, "C", 100))
+ .collect(Collectors.toSet());
+ Assertions.assertIterableEquals(expected, actual);
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(),
+ postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sink = "create table sink(\n" +
+ "pk_id BIGINT NOT NULL PRIMARY KEY,\n" +
+ "name varchar(255),\n" +
+ "score INT\n" +
+ ")";
+ statement.execute(sink);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing PostgreSql table failed!", e);
+ }
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (postgreSQLContainer != null) {
+ postgreSQLContainer.stop();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
new file mode 100644
index 000000000..188c7bf5d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_sink_cdc_changelog.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ },
+ {
+ kind = UPDATE_BEFORE
+ fields = [1, "A", 100]
+ },
+ {
+ kind = UPDATE_AFTER
+ fields = [1, "A_1", 100]
+ },
+ {
+ kind = DELETE
+ fields = [2, "B", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ driver = org.postgresql.Driver
+ url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ user = test
+ password = test
+
+ table = sink
+ primary_keys = ["pk_id"]
+ }
+}
\ No newline at end of file