You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/07 02:02:07 UTC

[shardingsphere] branch master updated: Optimized CDC implementation with MySQL and PostgreSQL support (#23906)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c41c1abf6 Optimized CDC implementation with MySQL and PostgreSQL support (#23906)
26c41c1abf6 is described below

commit 26c41c1abf6ca68bdb6679b1af74324448920972
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Feb 7 10:01:56 2023 +0800

    Optimized CDC implementation with MySQL and PostgreSQL support (#23906)
    
    * Optimized CDC implementation with MySQL and PostgreSQL support
    
    * Add CDC client implementation.
    
    * Fix codestyle
    
    * Refactor ImportDataSourceParameter
    
    * Fix ci error
    
    * Fix name
---
 kernel/data-pipeline/cdc/client/pom.xml            |  4 ++
 .../data/pipeline/cdc/client/CDCClient.java        | 16 +++--
 .../cdc/client/handler/LoginRequestHandler.java    |  4 +-
 .../client/handler/SubscriptionRequestHandler.java |  6 +-
 ...nGaussImporter.java => DataSourceImporter.java} | 38 ++++-------
 .../ImportDataSourceParameter.java}                | 31 ++++-----
 .../client/parameter/StartCDCClientParameter.java  |  4 ++
 .../cdc/client/sqlbuilder/MySQLSQLBuilder.java     | 79 ++++++++++++++++++++++
 .../cdc/client/sqlbuilder/OpenGaussSQLBuilder.java | 15 +++-
 .../client/sqlbuilder/PostgreSQLSQLBuilder.java    | 74 ++++++++++++++++++++
 .../cdc/client/sqlbuilder/SQLBuilderFactory.java   |  6 +-
 .../pipeline/cdc/client/util/AnyValueConvert.java  |  7 +-
 .../cdc/client/example/opengauss/Bootstrap.java    |  4 +-
 .../cdc/client/sqlbuilder/MySQLSQLBuilderTest.java | 62 +++++++++++++++++
 .../client/sqlbuilder/OpenGaussSQLBuilderTest.java | 62 +++++++++++++++++
 .../sqlbuilder/PostgreSQLSQLBuilderTest.java       | 62 +++++++++++++++++
 .../src/test/resources/env/opengauss.properties    | 22 ------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 14 +++-
 .../importer/connector/CDCImporterConnector.java   |  2 +-
 .../pipeline/cdc/util/ColumnValueConvertUtil.java  | 37 +++-------
 .../mysql/ingest/MySQLIncrementalDumper.java       |  2 +-
 .../backend/handler/cdc/CDCBackendHandler.java     |  5 +-
 .../pipeline/cases/base/PipelineBaseE2EIT.java     |  6 +-
 .../cases/migration/AbstractMigrationE2EIT.java    |  7 --
 24 files changed, 447 insertions(+), 122 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 03be36d694b..f549dbb815f 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -47,5 +47,9 @@
             <groupId>org.opengauss</groupId>
             <artifactId>opengauss-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 29e8dc33f38..fbb0a5c8f79 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
 import org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
@@ -69,15 +68,15 @@ public final class CDCClient {
     /**
      * Start ShardingSphere CDC client.
      */
-    @SneakyThrows(InterruptedException.class)
     public void start() {
         startInternal(parameter.getAddress(), parameter.getPort());
     }
     
-    private void startInternal(final String address, final int port) throws InterruptedException {
+    private void startInternal(final String address, final int port) {
         Bootstrap bootstrap = new Bootstrap();
+        NioEventLoopGroup group = new NioEventLoopGroup();
         bootstrap.channel(NioSocketChannel.class)
-                .group(new NioEventLoopGroup())
+                .group(group)
                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                 .option(ChannelOption.SO_REUSEADDR, true)
                 .handler(new ChannelInitializer<NioSocketChannel>() {
@@ -92,7 +91,12 @@ public final class CDCClient {
                         channel.pipeline().addLast(new SubscriptionRequestHandler(parameter));
                     }
                 });
-        ChannelFuture future = bootstrap.connect(address, port).sync();
-        future.channel().closeFuture().sync();
+        try {
+            ChannelFuture future = bootstrap.connect(address, port).sync();
+            future.channel().closeFuture().sync();
+        } catch (final InterruptedException ex) {
+            log.warn("CDC client interrupted", ex);
+            group.shutdownGracefully();
+        }
     }
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index f844cb13393..d14df086502 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -65,7 +65,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
         ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
         switch (connectionContext.getStatus()) {
             case CONNECTED:
-                setLogindRequest(ctx, response, connectionContext);
+                setLoginRequest(ctx, response, connectionContext);
                 break;
             case NOT_LOGGED_IN:
                 sendSubscriptionEvent(ctx, response, connectionContext);
@@ -75,7 +75,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
         }
     }
     
-    private void setLogindRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
+    private void setLoginRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
         ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
         log.info("Server greeting result, server version: {}, protocol version: {}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
         String encryptPassword = Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 8ac5fbe063a..e7288950a8a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.DataSourceImporter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.cdc.client.importer.ImporterFactory;
 import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
@@ -51,7 +51,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
     
     public SubscriptionRequestHandler(final StartCDCClientParameter parameter) {
         this.parameter = parameter;
-        importer = ImporterFactory.getImporter(parameter.getDatabaseType());
+        importer = new DataSourceImporter(parameter.getDatabaseType(), parameter.getImportDataSourceParameter());
     }
     
     @Override
@@ -105,7 +105,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
                 // CHECKSTYLE:OFF
             } catch (final Exception ex) {
                 // CHECKSTYLE:ON
-                log.error("write data failed", ex);
+                throw new RuntimeException(ex);
             }
         }
         // TODO data needs to be processed, such as writing to a database
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
similarity index 78%
rename from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
rename to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
index 7ff1c61f183..19fe8231793 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
@@ -18,17 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
 
 import com.google.protobuf.Any;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.ProtocolStringList;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
 import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -36,36 +33,31 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.Properties;
 
 /**
- * OpenGauss importer.
+ * Data source importer.
  */
-@RequiredArgsConstructor
 @Slf4j
-public final class OpenGaussImporter implements Importer {
-    
-    private final SQLBuilder sqlBuilder = SQLBuilderFactory.getSQLBuilder("openGauss");
+public final class DataSourceImporter implements Importer {
     
     private final Connection connection;
     
-    public OpenGaussImporter() {
-        Properties properties = new Properties();
-        try (InputStream inputStream = OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties")) {
-            properties.load(inputStream);
-            String url = properties.getProperty("url");
-            String port = properties.getProperty("port");
-            String database = properties.getProperty("database");
-            String username = properties.getProperty("username");
-            String password = properties.getProperty("password");
-            connection = DriverManager.getConnection(String.format("jdbc:opengauss://%s:%s/%s", url, port, database), username, password);
-        } catch (final IOException | SQLException ex) {
+    private final SQLBuilder sqlBuilder;
+    
+    public DataSourceImporter(final String databaseType, final ImportDataSourceParameter dataSourceParam) {
+        String jdbcUrl = Optional.ofNullable(dataSourceParam.getJdbcUrl()).orElseThrow(() -> new IllegalArgumentException("jdbcUrl is null"));
+        String username = Optional.ofNullable(dataSourceParam.getUsername()).orElseThrow(() -> new IllegalArgumentException("username is null"));
+        String password = Optional.ofNullable(dataSourceParam.getPassword()).orElseThrow(() -> new IllegalArgumentException("password is null"));
+        try {
+            connection = DriverManager.getConnection(jdbcUrl, username, password);
+        } catch (final SQLException ex) {
             throw new RuntimeException(ex);
         }
+        sqlBuilder = SQLBuilderFactory.getSQLBuilder(databaseType);
     }
     
     @Override
-    public void write(final Record record) throws SQLException, InvalidProtocolBufferException {
+    public void write(final Record record) throws Exception {
         Optional<String> sqlOptional = buildSQL(record);
         if (!sqlOptional.isPresent()) {
             log.error("build sql failed, record {}", record);
@@ -120,7 +112,7 @@ public final class OpenGaussImporter implements Importer {
     }
     
     @Override
-    public void close() throws SQLException {
+    public void close() throws Exception {
         connection.close();
     }
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
similarity index 62%
rename from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
rename to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
index 68a356d3fd2..52a9664fe6d 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
@@ -15,26 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Importer factory.
+ * Import data source parameter.
  */
-public final class ImporterFactory {
+@RequiredArgsConstructor
+@Getter
+public final class ImportDataSourceParameter {
+    
+    private final String jdbcUrl;
+    
+    private final String username;
     
-    /**
-     * Get importer.
-     *
-     * @param databaseType database type
-     * @return importer
-     */
-    // TODO use SPI
-    public static Importer getImporter(final String databaseType) {
-        switch (databaseType) {
-            case "openGauss":
-                return new OpenGaussImporter();
-            default:
-                return null;
-        }
-    }
+    private final String password;
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index 0377c1f9f55..a8c916f7b0c 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -29,6 +30,7 @@ import java.util.List;
  */
 @Getter
 @Setter
+@RequiredArgsConstructor
 public final class StartCDCClientParameter {
     
     private String databaseType;
@@ -50,4 +52,6 @@ public final class StartCDCClientParameter {
     private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
     
     private boolean incrementalGlobalOrderly;
+    
+    private final ImportDataSourceParameter importDataSourceParameter;
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java
new file mode 100644
index 00000000000..e9ea01263d4
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * MySQL SQL builder.
+ */
+public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+    
+    private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ADD", "ALL", "ALTER", "ANALYZE", "AND", "AS", "ASC", "BEFORE", "BETWEEN", "BIGINT", "BINARY", "BLOB", "BOTH", "BY", "CALL",
+            "CASCADE", "CASE", "CHANGE", "CHAR", "CHARACTER", "CHECK", "COLLATE", "COLUMN", "CONDITION", "CONSTRAINT", "CONTINUE", "CONVERT", "CREATE", "CROSS", "CUBE", "CUME_DIST", "CURRENT_DATE",
+            "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "CURSOR", "DATABASE", "DATABASES", "DAY_HOUR", "DAY_MICROSECOND", "DAY_MINUTE", "DAY_SECOND", "DEC", "DECIMAL", "DECLARE", "DEFAULT",
+            "DELAYED", "DELETE", "DENSE_RANK", "DESC", "DESCRIBE", "DETERMINISTIC", "DISTINCT", "DISTINCTROW", "DIV", "DOUBLE", "DROP", "DUAL", "ELSE", "ELSEIF", "ENCLOSED", "ESCAPED", "EACH", "ELSE",
+            "ELSEIF", "EMPTY", "ENCLOSED", "ESCAPED", "EXCEPT", "EXISTS", "EXIT", "EXPLAIN", "FALSE", "FETCH", "FIRST_VALUE", "FLOAT4", "FLOAT8", "FOR", "FORCE", "FOREIGN", "FROM", "FULLTEXT",
+            "FUNCTION", "GENERATED", "GET", "GRANT", "GROUP", "GROUPING", "GROUPS", "HAVING", "HIGH_PRIORITY", "HOUR_MICROSECOND", "HOUR_MINUTE", "HOUR_SECOND", "IF", "IGNORE", "IN", "INDEX",
+            "INFILE", "INNER", "INOUT", "INSENSITIVE", "INSERT", "INT", "INT1", "INT2", "INT3", "INT4", "INT8", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IO_AFTER_GTIDS", "IO_BEFORE_GTIDS", "IS",
+            "ITERATE", "JOIN", "JSON_TABLE", "KEY", "KEYS", "KILL", "LAG", "LAST_VALUE", "LATERAL", "LEAD", "LEADING", "LEAVE", "LEFT", "LIKE", "LIMIT", "LINES", "LOAD", "LOCALTIME", "LOCALTIMESTAMP",
+            "LOCK", "LONG", "LONGBLOB", "LONGTEXT", "LOOP", "LOW_PRIORITY", "MASTER_BIND", "MASTER_SSL_VERIFY_SERVER_CERT", "MATCH", "MAXVALUE", "MEDIUMBLOB", "MEDIUMINT", "MEDIUMTEXT", "MIDDLEINT",
+            "MINUTE_MICROSECOND", "MINUTE_SECOND", "MOD", "MODIFIES", "NATURAL", "NOT", "NO_WRITE_TO_BINLOG", "NTH_VALUE", "NTILE", "NULL", "NUMERIC", "OF", "ON", "OPTIMIZE", "OPTIMIZER_COSTS",
+            "OPTION", "OPTIONALLY", "OR", "ORDER", "OUT", "OUTER", "OUTFILE", "OVER", "PARTITION", "PERCENT_RANK", "PRECISION", "PRIMARY", "PROCEDURE", "PURGE", "RANK", "READ", "REAL", "RECURSIVE",
+            "REFERENCES", "REGEXP", "RELEASE", "RENAME", "REPEAT", "REPLACE", "REQUIRE", "RESIGNAL", "RESTRICT", "RETURN", "REVOKE", "RIGHT", "RLIKE", "ROW", "ROWS", "ROW_NUMBER", "SCHEMA", "SCHEMAS",
+            "SELECT", "SENSITIVE", "SEPARATOR", "SET", "SHOW", "SIGNAL", "SMALLINT", "SPATIAL", "SPECIFIC", "SQL", "SQLEXCEPTION", "SQLSTATE", "SQLWARNING", "SQL_BIG_RESULT", "SQL_CALC_FOUND_ROWS",
+            "SQL_SMALL_RESULT", "SSL", "STARTING", "STORED", "STRAIGHT_JOIN", "SYSTEM", "TABLE", "TERMINATED", "THEN", "TINYBLOB", "TINYINT", "TINYTEXT", "TO", "TRAILING", "TRIGGER", "TRUE", "UNDO",
+            "UNION", "UNIQUE", "UNLOCK", "UNSIGNED", "UPDATE", "USAGE", "USE", "USING", "UTC_DATE", "UTC_TIME", "UTC_TIMESTAMP", "VALUES", "VARBINARY", "VARCHAR", "VARCHARACTER", "VARYING", "VIRTUAL",
+            "WHEN", "WHERE", "WHILE", "WINDOW", "WITH", "WRITE", "XOR", "YEAR_MONTH", "ZEROFILL");
+    
+    @Override
+    protected boolean isKeyword(final String item) {
+        return RESERVED_KEYWORDS.contains(item.toUpperCase());
+    }
+    
+    @Override
+    protected String getLeftIdentifierQuoteString() {
+        return "`";
+    }
+    
+    @Override
+    protected String getRightIdentifierQuoteString() {
+        return "`";
+    }
+    
+    @Override
+    public String buildInsertSQL(final Record record) {
+        String insertSql = super.buildInsertSQL(record);
+        List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+        if (uniqueKeyNamesList.isEmpty()) {
+            return insertSql;
+        }
+        StringBuilder updateValue = new StringBuilder();
+        for (String each : record.getAfterMap().keySet()) {
+            if (uniqueKeyNamesList.contains(each)) {
+                continue;
+            }
+            updateValue.append(quote(each)).append("=VALUES(").append(quote(each)).append("),");
+        }
+        updateValue.setLength(updateValue.length() - 1);
+        return insertSql + " ON DUPLICATE KEY UPDATE " + updateValue;
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
index bb0132b27a0..afc9b401b7c 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
@@ -58,9 +58,18 @@ public final class OpenGaussSQLBuilder extends AbstractSQLBuilder {
     @Override
     public String buildInsertSQL(final Record record) {
         String insertSql = super.buildInsertSQL(record);
-        if (!record.getTableMetaData().getUniqueKeyNamesList().isEmpty()) {
-            return insertSql + " ON DUPLICATE KEY UPDATE NOTHING";
+        List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+        if (uniqueKeyNamesList.isEmpty()) {
+            return insertSql;
         }
-        return insertSql;
+        StringBuilder updateValue = new StringBuilder();
+        for (String each : record.getAfterMap().keySet()) {
+            if (uniqueKeyNamesList.contains(each)) {
+                continue;
+            }
+            updateValue.append(quote(each)).append("=EXCLUDED.").append(quote(each)).append(",");
+        }
+        updateValue.setLength(updateValue.length() - 1);
+        return insertSql + " ON DUPLICATE KEY UPDATE " + updateValue;
     }
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java
new file mode 100644
index 00000000000..95c5c37ce72
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * PostgreSQL SQL builder.
+ */
+public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
+    
+    private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "AUTHORIZATION", "BETWEEN", "BIGINT", "BINARY",
+            "BIT", "BOOLEAN", "BOTH", "CASE", "CAST", "CHAR", "CHARACTER", "CHECK", "COALESCE", "COLLATE", "COLLATION", "COLUMN", "CONCURRENTLY", "CONSTRAINT", "CREATE", "CROSS", "CURRENT_CATALOG",
+            "CURRENT_DATE", "CURRENT_ROLE", "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DEC", "DECIMAL", "DEFAULT", "DEFERRABLE", "DESC", "DISTINCT", "DO", "ELSE", "END",
+            "EXCEPT", "EXISTS", "EXTRACT", "FALSE", "FETCH", "FLOAT", "FOR", "FOREIGN", "FREEZE", "FROM", "FULL", "GRANT", "GREATEST", "GROUP", "GROUPING", "HAVING", "ILIKE", "IN", "INITIALLY",
+            "INNER", "INOUT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "ISNULL", "JOIN", "LATERAL", "LEADING", "LEAST", "LEFT", "LIKE", "LIMIT", "LOCALTIME", "LOCALTIMESTAMP",
+            "NATIONAL", "NATURAL", "NCHAR", "NONE", "NORMALIZE", "NOT", "NOTNULL", "NULL", "NULLIF", "NUMERIC", "OFFSET", "ON", "ONLY", "OR", "ORDER", "OUT", "OUTER", "OVERLAPS", "OVERLAY", "PLACING",
+            "POSITION", "PRECISION", "PRIMARY", "REAL", "REFERENCES", "RETURNING", "RIGHT", "ROW", "SELECT", "SESSION_USER", "SETOF", "SIMILAR", "SMALLINT", "SOME", "SUBSTRING", "SYMMETRIC", "TABLE",
+            "TABLESAMPLE", "THEN", "TIME", "TIMESTAMP", "TO", "TRAILING", "TREAT", "TRIM", "TRUE", "UNION", "UNIQUE", "USER", "USING", "VALUES", "VARCHAR", "VARIADIC", "VERBOSE", "WHEN", "WHERE",
+            "WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT", "XMLEXISTS", "XMLFOREST", "XMLNAMESPACES", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE", "XMLTABLE");
+    
+    @Override
+    protected boolean isKeyword(final String item) {
+        return RESERVED_KEYWORDS.contains(item.toUpperCase());
+    }
+    
+    @Override
+    protected String getLeftIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    protected String getRightIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    public String buildInsertSQL(final Record record) {
+        String insertSql = super.buildInsertSQL(record);
+        List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+        if (uniqueKeyNamesList.isEmpty()) {
+            return insertSql;
+        }
+        StringBuilder updateValue = new StringBuilder();
+        for (String each : record.getAfterMap().keySet()) {
+            if (uniqueKeyNamesList.contains(each)) {
+                continue;
+            }
+            updateValue.append(quote(each)).append("=EXCLUDED.").append(quote(each)).append(",");
+        }
+        updateValue.setLength(updateValue.length() - 1);
+        String uniqueKeyNames = uniqueKeyNamesList.stream().map(this::quote).collect(Collectors.joining(","));
+        return insertSql + String.format(" ON CONFLICT (%s) DO UPDATE SET %s", uniqueKeyNames, updateValue);
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
index 934a2768c3a..6fdb544eb6e 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
@@ -23,7 +23,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
 public final class SQLBuilderFactory {
     
     /**
-     * Get sql builder.
+     * Get SQL builder.
      *
      * @param databaseType database type
      * @return SQL builder
@@ -32,6 +32,10 @@ public final class SQLBuilderFactory {
         switch (databaseType) {
             case "openGauss":
                 return new OpenGaussSQLBuilder();
+            case "MySQL":
+                return new MySQLSQLBuilder();
+            case "PostgreSQL":
+                return new PostgreSQLSQLBuilder();
             default:
                 throw new UnsupportedOperationException(String.format("Not supported %s now", databaseType));
         }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
index 71f39a31cec..afc52c8c555 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
@@ -26,7 +26,7 @@ import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.StringValue;
-import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
@@ -43,6 +43,7 @@ import java.time.LocalTime;
 /**
  * Any value convert.
  */
+@Slf4j
 public final class AnyValueConvert {
     
     /**
@@ -98,7 +99,9 @@ public final class AnyValueConvert {
         if (any.is(BlobValue.class)) {
             return any.unpack(BlobValue.class).getValue().toByteArray();
         }
-        return JsonFormat.printer().includingDefaultValueFields().print(any);
+        // TODO can't use JsonFormat, might change the original value without error prompt. there need to cover more types,
+        log.error("not support unpack value={}", any);
+        throw new UnsupportedOperationException(String.format("not support unpack the type %s", any.getTypeUrl()));
     }
     
     private static Timestamp converProtobufTimestamp(final com.google.protobuf.Timestamp timestamp) {
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
index f7c81de30e5..b86b112057e 100644
--- a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
 
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -32,7 +33,8 @@ public final class Bootstrap {
      * @param args args
      */
     public static void main(final String[] args) {
-        StartCDCClientParameter parameter = new StartCDCClientParameter();
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+        StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
         parameter.setAddress("127.0.0.1");
         parameter.setPort(33071);
         parameter.setUsername("root");
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java
new file mode 100644
index 00000000000..f922596f4dc
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class MySQLSQLBuilderTest {
+    
+    @Test
+    public void assertBuildInsertSQLWithUniqueKey() {
+        MySQLSQLBuilder sqlBuilder = new MySQLSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON DUPLICATE KEY UPDATE user_id=VALUES(user_id),status=VALUES(status)";
+        assertThat(actualSql, is(expectedSql));
+    }
+    
+    private Map<String, Any> buildAfterMap() {
+        Map<String, Any> result = new LinkedHashMap<>();
+        result.put("order_id", Any.pack(Int32Value.of(1)));
+        result.put("user_id", Any.pack(Int32Value.of(2)));
+        result.put("status", Any.pack(StringValue.of("OK")));
+        return result;
+    }
+    
+    @Test
+    public void assertBuildInsertSQLWithoutUniqueKey() {
+        MySQLSQLBuilder sqlBuilder = new MySQLSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+        assertThat(actualSql, is(expectedSql));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java
new file mode 100644
index 00000000000..90a34bd8de8
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class OpenGaussSQLBuilderTest {
+    
+    @Test
+    public void assertBuildInsertSQLWithUniqueKey() {
+        OpenGaussSQLBuilder sqlBuilder = new OpenGaussSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON DUPLICATE KEY UPDATE user_id=EXCLUDED.user_id,status=EXCLUDED.status";
+        assertThat(actualSql, is(expectedSql));
+    }
+    
+    private Map<String, Any> buildAfterMap() {
+        Map<String, Any> result = new LinkedHashMap<>();
+        result.put("order_id", Any.pack(Int32Value.of(1)));
+        result.put("user_id", Any.pack(Int32Value.of(2)));
+        result.put("status", Any.pack(StringValue.of("OK")));
+        return result;
+    }
+    
+    @Test
+    public void assertBuildInsertSQLWithoutUniqueKey() {
+        OpenGaussSQLBuilder sqlBuilder = new OpenGaussSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+        assertThat(actualSql, is(expectedSql));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java
new file mode 100644
index 00000000000..1cefb83e7dc
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class PostgreSQLSQLBuilderTest {
+    
+    @Test
+    public void assertBuildInsertSQLWithUniqueKey() {
+        PostgreSQLSQLBuilder sqlBuilder = new PostgreSQLSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON CONFLICT (order_id) DO UPDATE SET user_id=EXCLUDED.user_id,status=EXCLUDED.status";
+        assertThat(actualSql, is(expectedSql));
+    }
+    
+    private Map<String, Any> buildAfterMap() {
+        Map<String, Any> result = new LinkedHashMap<>();
+        result.put("order_id", Any.pack(Int32Value.of(1)));
+        result.put("user_id", Any.pack(Int32Value.of(2)));
+        result.put("status", Any.pack(StringValue.of("OK")));
+        return result;
+    }
+    
+    @Test
+    public void assertBuildInsertSQLWithoutUniqueKey() {
+        PostgreSQLSQLBuilder sqlBuilder = new PostgreSQLSQLBuilder();
+        TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+        Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+        String actualSql = sqlBuilder.buildInsertSQL(record);
+        String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+        assertThat(actualSql, is(expectedSql));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
deleted file mode 100644
index 88f4616e123..00000000000
--- a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-url=127.0.0.1
-port=5432
-database=cdc_db
-username=gaussdb
-password=Root@123
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 4aa5b30e55a..5708a485a4d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -74,7 +74,6 @@ import javax.sql.DataSource;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -175,7 +174,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
         Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
         dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(Collections.emptyMap());
+        TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getTableNames());
         String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfiguration = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
         DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, dataSourceName, actualDataSourceConfiguration, tableNameMap, tableNameSchemaNameMapping);
@@ -185,6 +184,17 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         return result;
     }
     
+    private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final List<String> tableNames) {
+        Map<LogicTableName, String> tableNameSchemaMap = new LinkedHashMap<>();
+        for (String each : tableNames) {
+            String[] split = each.split("\\.");
+            if (split.length > 1) {
+                tableNameSchemaMap.put(new LogicTableName(split[1]), split[0]);
+            }
+        }
+        return new TableNameSchemaNameMapping(tableNameSchemaMap);
+    }
+    
     private static DumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSourceConfig,
                                                                 final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         DumperConfiguration result = new DumperConfiguration();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index 881629588ba..d1b3e8e90bc 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -90,7 +90,7 @@ public final class CDCImporterConnector implements ImporterConnector {
         this.jobShardingCount = jobShardingCount;
         tableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
             String[] split = each.split("\\.");
-            tableNameSchemaMap.put(split[0], split[1]);
+            tableNameSchemaMap.put(split[1], split[0]);
         });
         this.dataRecordComparator = dataRecordComparator;
     }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
index c62c9327748..b84cf56906b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.util;
 
-import com.google.gson.Gson;
 import com.google.protobuf.BoolValue;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
@@ -25,12 +24,8 @@ import com.google.protobuf.DoubleValue;
 import com.google.protobuf.FloatValue;
 import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
-import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
 import com.google.protobuf.StringValue;
-import com.google.protobuf.Struct;
-import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
@@ -58,8 +53,6 @@ import java.util.Date;
 @Slf4j
 public final class ColumnValueConvertUtil {
     
-    private static final Gson GSON = new Gson();
-    
     /**
      * Convert java object to protobuf message.
      *
@@ -71,37 +64,37 @@ public final class ColumnValueConvertUtil {
             return NullValue.newBuilder().build();
         }
         if (object instanceof Integer) {
-            return Int32Value.newBuilder().setValue((int) object).build();
+            return Int32Value.of((int) object);
         }
         if (object instanceof Short) {
-            return Int32Value.newBuilder().setValue(((Short) object).intValue()).build();
+            return Int32Value.of(((Short) object).intValue());
         }
         if (object instanceof Byte) {
-            return Int32Value.newBuilder().setValue(((Byte) object).intValue()).build();
+            return Int32Value.of(((Byte) object).intValue());
         }
         if (object instanceof Long) {
-            return Int64Value.newBuilder().setValue((long) object).build();
+            return Int64Value.of((long) object);
         }
         if (object instanceof BigInteger) {
             return BigIntegerValue.newBuilder().setValue(ByteString.copyFrom(((BigInteger) object).toByteArray())).build();
         }
         if (object instanceof Float) {
-            return FloatValue.newBuilder().setValue((float) object).build();
+            return FloatValue.of((float) object);
         }
         if (object instanceof Double) {
-            return DoubleValue.newBuilder().setValue((double) object).build();
+            return DoubleValue.of((double) object);
         }
         if (object instanceof BigDecimal) {
             return BigDecimalValue.newBuilder().setValue(object.toString()).build();
         }
         if (object instanceof String) {
-            return StringValue.newBuilder().setValue(object.toString()).build();
+            return StringValue.of(object.toString());
         }
         if (object instanceof Boolean) {
-            return BoolValue.newBuilder().setValue((boolean) object).build();
+            return BoolValue.of((boolean) object);
         }
         if (object instanceof byte[]) {
-            return BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[]) object)).build();
+            return BytesValue.of(ByteString.copyFrom((byte[]) object));
         }
         if (object instanceof Date) {
             return converToProtobufTimestamp((Date) object);
@@ -141,21 +134,11 @@ public final class ColumnValueConvertUtil {
                 throw new RuntimeException(ex);
             }
         }
-        return fromJson(GSON.toJson(object));
+        return StringValue.newBuilder().setValue(object.toString()).build();
     }
     
     private static com.google.protobuf.Timestamp converToProtobufTimestamp(final Date timestamp) {
         long millis = timestamp.getTime();
         return com.google.protobuf.Timestamp.newBuilder().setSeconds(millis / 1000).setNanos((int) ((millis % 1000) * 1000000)).build();
     }
-    
-    private static Message fromJson(final String json) {
-        Builder structBuilder = Struct.newBuilder();
-        try {
-            JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder);
-        } catch (final InvalidProtocolBufferException ex) {
-            throw new RuntimeException(ex);
-        }
-        return structBuilder.build();
-    }
 }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 29c835b8b31..b2a3a15a4f3 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -156,7 +156,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
                 boolean updated = !Objects.equals(newValue, oldValue);
                 PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
                 record.addColumn(new Column(columnMetaData.getName(),
-                        (columnMetaData.isPrimaryKey() && updated) ? handleValue(columnMetaData, oldValue) : null,
+                        handleValue(columnMetaData, oldValue),
                         handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
             }
             channel.pushRecord(record);
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 7e9fa85d54d..1137e669816 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -87,8 +87,9 @@ public final class CDCBackendHandler {
             return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
         }
         Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
-        for (String each : tableNames) {
-            actualDataNodesMap.put(each, getActualDataNodes(shardingRule.get(), each));
+        // TODO need support case-insensitive later
+        for (TableName each : createSubscription.getTableNamesList()) {
+            actualDataNodesMap.put(each.getName(), getActualDataNodes(shardingRule.get(), each.getName()));
         }
         CreateSubscriptionJobParameter parameter = new CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames, createSubscription.getSubscriptionName(),
                 createSubscription.getSubscriptionMode().name(), actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 99e43b5cf4a..b42a3e7bb59 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -150,6 +150,7 @@ public abstract class PipelineBaseE2EIT {
         }
         try {
             connection.createStatement().execute(String.format("DROP DATABASE %s", PROXY_DATABASE));
+            ThreadUtil.sleep(2, TimeUnit.SECONDS);
         } catch (final SQLException ex) {
             log.warn("Drop proxy database failed, maybe it's not exist. error msg={}", ex.getMessage());
         }
@@ -189,7 +190,10 @@ public abstract class PipelineBaseE2EIT {
     }
     
     private void createProxyDatabase(final Connection connection) throws SQLException {
-        connection.createStatement().execute(String.format("CREATE DATABASE %s", PROXY_DATABASE));
+        String sql = String.format("CREATE DATABASE %s", PROXY_DATABASE);
+        log.info("create proxy database {}", PROXY_DATABASE);
+        connection.createStatement().execute(sql);
+        ThreadUtil.sleep(2, TimeUnit.SECONDS);
     }
     
     protected void addResource(final String distSQL) throws SQLException {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 4c0cf2740c1..d668060cba5 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -117,13 +117,6 @@ public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
     }
     
     protected void addMigrationProcessConfig() throws SQLException {
-        if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
-            try {
-                proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
-            } catch (final SQLException ex) {
-                log.warn("Drop migration process configuration failed, maybe it's not exist. error msg={}", ex.getMessage());
-            }
-        }
         proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
     }