You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/01/04 07:22:55 UTC

[shardingsphere] branch master updated: CDC client add openGauss importer implementation (#23320)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d6c23064b CDC client add openGauss importer implementation (#23320)
a9d6c23064b is described below

commit a9d6c23064b96aae161f726353892335db7098e0
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jan 4 15:22:48 2023 +0800

    CDC client add openGauss importer implementation (#23320)
    
    * CDC client add openGauss importer
    
    * Fix codestyle
    
    * Fix codestyle
---
 kernel/data-pipeline/cdc/client/pom.xml            |   4 +-
 .../data/pipeline/cdc/client/CDCClient.java        |   3 +-
 .../cdc/client/handler/LoginRequestHandler.java    |   1 +
 .../client/handler/SubscriptionRequestHandler.java |  42 +++---
 .../Importer.java}                                 |  48 +++----
 .../cdc/client/importer/ImporterFactory.java       |  40 ++++++
 .../cdc/client/importer/OpenGaussImporter.java     | 126 ++++++++++++++++++
 .../client/parameter/StartCDCClientParameter.java  |   2 +
 .../cdc/client/sqlbuilder/AbstractSQLBuilder.java  | 147 +++++++++++++++++++++
 .../cdc/client/sqlbuilder/OpenGaussSQLBuilder.java |  66 +++++++++
 .../SQLBuilder.java}                               |  57 ++++----
 .../cdc/client/sqlbuilder/SQLBuilderFactory.java   |  39 ++++++
 .../pipeline/cdc/client/util/AnyValueConvert.java  | 107 +++++++++++++++
 .../cdc/client/example/opengauss/Bootstrap.java    |  49 +++++++
 .../src/test/resources/env/opengauss.properties    |  22 +++
 .../pipeline/cdc/core/importer/CDCImporter.java    |   6 +-
 .../importer/connector/CDCImporterConnector.java   |  18 ++-
 .../data/pipeline/cdc/core/job/CDCJob.java         |   4 +-
 .../pipeline/cdc/core/task/CDCTasksRunner.java     |  40 ++++++
 .../cdc/util/DataRecordResultConvertUtil.java      |  15 ++-
 .../src/main/proto/CDCResponseProtocol.proto       |   1 +
 .../core/task/InventoryIncrementalTasksRunner.java |  20 +--
 22 files changed, 757 insertions(+), 100 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 6433c17c06f..13da97f129b 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -44,8 +44,8 @@
             <version>${netty.version}</version>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
+            <groupId>org.opengauss</groupId>
+            <artifactId>opengauss-jdbc</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 4f2acf1fd90..29e8dc33f38 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -89,8 +89,7 @@ public final class CDCClient {
                         channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
                         channel.pipeline().addLast(new LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new SubscriptionRequestHandler(parameter.getDatabase(), parameter.getSubscriptionName(), parameter.getSubscribeTables(),
-                                parameter.getSubscriptionMode(), parameter.isIncrementalGlobalOrderly()));
+                        channel.pipeline().addLast(new SubscriptionRequestHandler(parameter));
                     }
                 });
         ChannelFuture future = bootstrap.connect(address, port).sync();
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 2372f37fc86..f844cb13393 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -56,6 +56,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
         ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
+        ctx.fireChannelInactive();
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 17855103394..8ac5fbe063a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -19,18 +19,18 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.ImporterFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -43,24 +43,22 @@ import java.util.List;
  * Subscription request handler.
  */
 @Slf4j
-@RequiredArgsConstructor
 public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
     
-    private final String database;
+    private final StartCDCClientParameter parameter;
     
-    private final String subscriptionName;
+    private final Importer importer;
     
-    private final List<TableName> subscribeTables;
-    
-    private final SubscriptionMode subscribeMode;
-    
-    private final boolean incrementalGlobalOrderly;
+    public SubscriptionRequestHandler(final StartCDCClientParameter parameter) {
+        this.parameter = parameter;
+        importer = ImporterFactory.getImporter(parameter.getDatabaseType());
+    }
     
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
         if (evt instanceof CreateSubscriptionEvent) {
-            CreateSubscriptionRequest createSubscriptionRequest = CreateSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionMode(subscribeMode).setSubscriptionName(subscriptionName)
-                    .addAllTableNames(subscribeTables).setIncrementalGlobalOrderly(incrementalGlobalOrderly).build();
+            CreateSubscriptionRequest createSubscriptionRequest = CreateSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionMode(parameter.getSubscriptionMode())
+                    .setSubscriptionName(parameter.getSubscriptionName()).addAllTableNames(parameter.getSubscribeTables()).setIncrementalGlobalOrderly(parameter.isIncrementalGlobalOrderly()).build();
             CDCRequest request = CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
             ctx.writeAndFlush(request);
         }
@@ -88,7 +86,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
     
     private void sendCreateSubscriptionRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
         log.info("create subscription succeed, subscription name {}, exist {}", response.getCreateSubscriptionResult().getSubscriptionName(), response.getCreateSubscriptionResult().getExisting());
-        StartSubscriptionRequest startSubscriptionRequest = StartSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionName(subscriptionName).build();
+        StartSubscriptionRequest startSubscriptionRequest = StartSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionName(parameter.getSubscriptionName()).build();
         Builder builder = CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
         ctx.writeAndFlush(builder.build());
         connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
@@ -101,11 +99,25 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
     
     private void subscribeDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
         List<Record> recordsList = result.getRecordsList();
-        log.debug("received records {}", recordsList);
+        for (Record each : recordsList) {
+            try {
+                importer.write(each);
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("write data failed", ex);
+            }
+        }
         // TODO data needs to be processed, such as writing to a database
         ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+        importer.close();
+        ctx.fireChannelInactive();
+    }
+    
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
         log.error("subscription handler error", cause);
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
similarity index 50%
copy from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
copy to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
index 90d0ab31c6e..281860264f8 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
@@ -15,37 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 /**
- * Start CDC client parameter.
+ * Importer.
  */
-@Getter
-@Setter
-public final class StartCDCClientParameter {
-    
-    private String address;
-    
-    private int port;
-    
-    private String username;
-    
-    private String password;
-    
-    private String database;
-    
-    private List<TableName> subscribeTables;
-    
-    private String subscriptionName;
-    
-    private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-    
-    private boolean incrementalGlobalOrderly;
+public interface Importer {
+    
+    /**
+     * Write record.
+     *
+     * @param record record
+     * @throws Exception exception
+     */
+    void write(Record record) throws Exception;
+    
+    /**
+     * Close importer.
+     *
+     * @throws Exception exception
+     */
+    void close() throws Exception;
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
new file mode 100644
index 00000000000..68a356d3fd2
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+/**
+ * Importer factory.
+ */
+public final class ImporterFactory {
+    
+    /**
+     * Get importer.
+     *
+     * @param databaseType database type
+     * @return importer
+     */
+    // TODO use SPI
+    public static Importer getImporter(final String databaseType) {
+        switch (databaseType) {
+            case "openGauss":
+                return new OpenGaussImporter();
+            default:
+                return null;
+        }
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
new file mode 100644
index 00000000000..7ff1c61f183
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolStringList;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
+import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * OpenGauss importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class OpenGaussImporter implements Importer {
+    
+    private final SQLBuilder sqlBuilder = SQLBuilderFactory.getSQLBuilder("openGauss");
+    
+    private final Connection connection;
+    
+    public OpenGaussImporter() {
+        Properties properties = new Properties();
+        try (InputStream inputStream = OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties")) {
+            properties.load(inputStream);
+            String url = properties.getProperty("url");
+            String port = properties.getProperty("port");
+            String database = properties.getProperty("database");
+            String username = properties.getProperty("username");
+            String password = properties.getProperty("password");
+            connection = DriverManager.getConnection(String.format("jdbc:opengauss://%s:%s/%s", url, port, database), username, password);
+        } catch (final IOException | SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    @Override
+    public void write(final Record record) throws SQLException, InvalidProtocolBufferException {
+        Optional<String> sqlOptional = buildSQL(record);
+        if (!sqlOptional.isPresent()) {
+            log.error("build sql failed, record {}", record);
+            throw new RuntimeException("build sql failed");
+        }
+        String sql = sqlOptional.get();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            List<Any> afterValue = new ArrayList<>(record.getAfterMap().values());
+            ProtocolStringList uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+            List<String> conditionColumnNames = record.getBeforeMap().keySet().containsAll(uniqueKeyNamesList) ? uniqueKeyNamesList : new ArrayList<>(record.getBeforeMap().keySet());
+            switch (record.getDataChangeType()) {
+                case INSERT:
+                    for (int i = 0; i < afterValue.size(); i++) {
+                        preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(afterValue.get(i)));
+                    }
+                    break;
+                case UPDATE:
+                    for (int i = 0; i < afterValue.size(); i++) {
+                        preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(afterValue.get(i)));
+                    }
+                    for (int i = 0; i < conditionColumnNames.size(); i++) {
+                        preparedStatement.setObject(afterValue.size() + i + 1, AnyValueConvert.convertToObject(record.getBeforeMap().get(conditionColumnNames.get(i))));
+                    }
+                    int updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount) {
+                        log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", updateCount, sql, record.getAfterMap().keySet(), conditionColumnNames);
+                    }
+                    break;
+                case DELETE:
+                    for (int i = 0; i < conditionColumnNames.size(); i++) {
+                        preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(record.getAfterMap().get(conditionColumnNames.get(i))));
+                    }
+                    preparedStatement.execute();
+                    break;
+                default:
+            }
+            preparedStatement.execute();
+        }
+    }
+    
+    private Optional<String> buildSQL(final Record record) {
+        switch (record.getDataChangeType()) {
+            case INSERT:
+                return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+            case UPDATE:
+                return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+            case DELETE:
+                return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+            default:
+                return Optional.empty();
+        }
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        connection.close();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index 90d0ab31c6e..0377c1f9f55 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -31,6 +31,8 @@ import java.util.List;
 @Setter
 public final class StartCDCClientParameter {
     
+    private String databaseType;
+    
     private String address;
     
     private int port;
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java
new file mode 100644
index 00000000000..467097b8838
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Abstract SQL builder.
+ */
+public abstract class AbstractSQLBuilder implements SQLBuilder {
+    
+    protected static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
+    
+    protected static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
+    
+    protected static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
+    
+    @Getter
+    private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
+    
+    /**
+     * Add left and right identifier quote string.
+     *
+     * @param item to add quote item
+     * @return add quote string
+     */
+    public String quote(final String item) {
+        return isKeyword(item) ? getLeftIdentifierQuoteString() + item + getRightIdentifierQuoteString() : item;
+    }
+    
+    protected abstract boolean isKeyword(String item);
+    
+    /**
+     * Get left identifier quote string.
+     *
+     * @return string
+     */
+    protected abstract String getLeftIdentifierQuoteString();
+    
+    /**
+     * Get right identifier quote string.
+     *
+     * @return string
+     */
+    protected abstract String getRightIdentifierQuoteString();
+    
+    protected final String getQualifiedTableName(final String schemaName, final String tableName) {
+        StringBuilder result = new StringBuilder();
+        if (!Strings.isNullOrEmpty(schemaName)) {
+            result.append(quote(schemaName)).append(".");
+        }
+        result.append(quote(tableName));
+        return result.toString();
+    }
+    
+    @Override
+    public String buildInsertSQL(final Record record) {
+        String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + record.getTableMetaData().getTableName();
+        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+            sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(record));
+        }
+        return sqlCacheMap.get(sqlCacheKey);
+    }
+    
+    private String buildInsertSQLInternal(final Record record) {
+        StringBuilder columnsLiteral = new StringBuilder();
+        StringBuilder holder = new StringBuilder();
+        for (String each : record.getAfterMap().keySet()) {
+            columnsLiteral.append(String.format("%s,", quote(each)));
+            holder.append("?,");
+        }
+        columnsLiteral.setLength(columnsLiteral.length() - 1);
+        holder.setLength(holder.length() - 1);
+        TableMetaData tableMetaData = record.getTableMetaData();
+        return String.format("INSERT INTO %s(%s) VALUES(%s)", getQualifiedTableName(tableMetaData.getSchema(), tableMetaData.getTableName()), columnsLiteral, holder);
+    }
+    
+    @Override
+    public String buildUpdateSQL(final Record record) {
+        TableMetaData tableMetaData = record.getTableMetaData();
+        String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + tableMetaData.getTableName();
+        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+            sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(tableMetaData.getSchema(), tableMetaData.getTableName(), record.getBeforeMap().keySet(), tableMetaData.getUniqueKeyNamesList()));
+        }
+        StringBuilder updatedColumnString = new StringBuilder();
+        for (String each : record.getAfterMap().keySet()) {
+            updatedColumnString.append(String.format("%s = ?,", quote(each)));
+        }
+        updatedColumnString.setLength(updatedColumnString.length() - 1);
+        return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString);
+    }
+    
+    private String buildUpdateSQLInternal(final String schemaName, final String tableName, final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+        return String.format("UPDATE %s SET %%s WHERE %s", getQualifiedTableName(schemaName, tableName), buildWhereSQL(columnNames, uniqueKeyNames));
+    }
+    
+    private String buildWhereSQL(final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+        StringBuilder where = new StringBuilder();
+        for (String each : columnNames.containsAll(uniqueKeyNames) ? uniqueKeyNames : columnNames) {
+            where.append(String.format("%s = ? and ", quote(each)));
+        }
+        where.setLength(where.length() - 5);
+        return where.toString();
+    }
+    
+    /**
+     * Build delete SQL.
+     *
+     * @param record record
+     * @return delete SQL
+     */
+    @Override
+    public String buildDeleteSQL(final Record record) {
+        TableMetaData tableMetaData = record.getTableMetaData();
+        String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + tableMetaData.getTableName();
+        if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+            sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(tableMetaData.getSchema(), tableMetaData.getTableName(), record.getBeforeMap().keySet(), tableMetaData.getUniqueKeyNamesList()));
+        }
+        return sqlCacheMap.get(sqlCacheKey);
+    }
+    
+    private String buildDeleteSQLInternal(final String schemaName, final String tableName, final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+        return String.format("DELETE FROM %s WHERE %s", getQualifiedTableName(schemaName, tableName), buildWhereSQL(columnNames, uniqueKeyNames));
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
new file mode 100644
index 00000000000..bb0132b27a0
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Pipeline SQL builder of openGauss.
+ */
+public final class OpenGaussSQLBuilder extends AbstractSQLBuilder {
+    
+    private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "AUTHID", "AUTHORIZATION", "BETWEEN", "BIGINT",
+            "BINARY", "BINARY_DOUBLE", "BINARY_INTEGER", "BIT", "BOOLEAN", "BOTH", "BUCKETCNT", "BUCKETS", "BYTEAWITHOUTORDER", "BYTEAWITHOUTORDERWITHEQUAL", "CASE", "CAST", "CHAR", "CHARACTER",
+            "CHECK", "COALESCE", "COLLATE", "COLLATION", "COLUMN", "COMPACT", "CONCURRENTLY", "CONSTRAINT", "CREATE", "CROSS", "CSN", "CURRENT_CATALOG", "CURRENT_DATE", "CURRENT_ROLE",
+            "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DATE", "DEC", "DECIMAL", "DECODE", "DEFAULT", "DEFERRABLE", "DELTAMERGE", "DESC", "DISTINCT", "DO", "ELSE", "END",
+            "EXCEPT", "EXCLUDED", "EXISTS", "EXTRACT", "FALSE", "FENCED", "FETCH", "FLOAT", "FOR", "FOREIGN", "FREEZE", "FROM", "FULL", "GRANT", "GREATEST", "GROUP", "GROUPING", "GROUPPARENT",
+            "HAVING", "HDFSDIRECTORY", "ILIKE", "IN", "INITIALLY", "INNER", "INOUT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "JOIN", "LEADING", "LEAST", "LEFT", "LESS", "LIKE",
+            "LIMIT", "LOCALTIME", "LOCALTIMESTAMP", "MAXVALUE", "MINUS", "MODIFY", "NATIONAL", "NATURAL", "NCHAR", "NOCYCLE", "NONE", "NOT", "NOTNULL", "NULL", "NULLIF", "NUMBER", "NUMERIC",
+            "NVARCHAR", "NVARCHAR2", "NVL", "OFFSET", "ON", "ONLY", "OR", "ORDER", "OUT", "OUTER", "OVERLAPS", "OVERLAY", "PERFORMANCE", "PLACING", "POSITION", "PRECISION", "PRIMARY", "PRIORER",
+            "PROCEDURE", "REAL", "RECYCLEBIN", "REFERENCES", "REJECT", "RETURNING", "RIGHT", "ROW", "ROWNUM", "SELECT", "SESSION_USER", "SETOF", "SIMILAR", "SMALLDATETIME", "SMALLINT", "SOME",
+            "SUBSTRING", "SYMMETRIC", "SYSDATE", "TABLE", "TABLESAMPLE", "THEN", "TIME", "TIMECAPSULE", "TIMESTAMP", "TIMESTAMPDIFF", "TINYINT", "TO", "TRAILING", "TREAT", "TRIM", "TRUE", "UNION",
+            "UNIQUE", "USER", "USING", "VALUES", "VARCHAR", "VARCHAR2", "VARIADIC", "VERBOSE", "VERIFY", "WHEN", "WHERE", "WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT", "XMLEXISTS",
+            "XMLFOREST", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE");
+    
+    @Override
+    protected boolean isKeyword(final String item) {
+        return RESERVED_KEYWORDS.contains(item.toUpperCase());
+    }
+    
+    @Override
+    protected String getLeftIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    protected String getRightIdentifierQuoteString() {
+        return "\"";
+    }
+    
+    @Override
+    public String buildInsertSQL(final Record record) {
+        String insertSql = super.buildInsertSQL(record);
+        if (!record.getTableMetaData().getUniqueKeyNamesList().isEmpty()) {
+            return insertSql + " ON DUPLICATE KEY UPDATE NOTHING";
+        }
+        return insertSql;
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
similarity index 50%
copy from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
copy to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
index 90d0ab31c6e..33d698a33fa 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
@@ -15,37 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
 
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 /**
- * Start CDC client parameter.
+ * SQL builder.
  */
-@Getter
-@Setter
-public final class StartCDCClientParameter {
-    
-    private String address;
-    
-    private int port;
-    
-    private String username;
-    
-    private String password;
-    
-    private String database;
-    
-    private List<TableName> subscribeTables;
-    
-    private String subscriptionName;
-    
-    private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-    
-    private boolean incrementalGlobalOrderly;
+public interface SQLBuilder {
+    
+    /**
+     * Build insert SQL.
+     *
+     * @param record data record
+     * @return insert SQL
+     */
+    String buildInsertSQL(Record record);
+    
+    /**
+     * Build update SQL.
+     *
+     * @param record record
+     * @return update SQL
+     */
+    String buildUpdateSQL(Record record);
+    
+    /**
+     * Build delete SQL.
+     *
+     * @param record record
+     * @return update SQL
+     */
+    String buildDeleteSQL(Record record);
 }
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
new file mode 100644
index 00000000000..934a2768c3a
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+/**
+ * SQL builder factory.
+ */
+public final class SQLBuilderFactory {
+    
+    /**
+     * Get sql builder.
+     *
+     * @param databaseType database type
+     * @return SQL builder
+     */
+    public static SQLBuilder getSQLBuilder(final String databaseType) {
+        switch (databaseType) {
+            case "openGauss":
+                return new OpenGaussSQLBuilder();
+            default:
+                throw new UnsupportedOperationException(String.format("Not supported %s now", databaseType));
+        }
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
new file mode 100644
index 00000000000..71f39a31cec
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ClobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalTime;
+
+/**
+ * Any value convert.
+ */
+public final class AnyValueConvert {
+    
+    /**
+     * Convert any to object.
+     *
+     * @param any any
+     * @return object
+     * @throws InvalidProtocolBufferException invalid protocol buffer exception
+     */
+    public static Object convertToObject(final Any any) throws InvalidProtocolBufferException {
+        if (null == any || any.is(NullValue.class)) {
+            return null;
+        }
+        if (any.is(StringValue.class)) {
+            return any.unpack(StringValue.class).getValue();
+        }
+        if (any.is(Int32Value.class)) {
+            return any.unpack(Int32Value.class).getValue();
+        }
+        if (any.is(Int64Value.class)) {
+            return any.unpack(Int64Value.class).getValue();
+        }
+        if (any.is(Int64Value.class)) {
+            return any.unpack(Int64Value.class).getValue();
+        }
+        if (any.is(BigIntegerValue.class)) {
+            return new BigInteger(any.unpack(BigIntegerValue.class).getValue().toByteArray());
+        }
+        if (any.is(FloatValue.class)) {
+            return any.unpack(FloatValue.class).getValue();
+        }
+        if (any.is(DoubleValue.class)) {
+            return any.unpack(DoubleValue.class).getValue();
+        }
+        if (any.is(BigDecimalValue.class)) {
+            return new BigDecimal(any.unpack(BigDecimalValue.class).getValue());
+        }
+        if (any.is(BoolValue.class)) {
+            return any.unpack(BoolValue.class).getValue();
+        }
+        if (any.is(BytesValue.class)) {
+            return any.unpack(BytesValue.class).getValue().toByteArray();
+        }
+        if (any.is(com.google.protobuf.Timestamp.class)) {
+            return converProtobufTimestamp(any.unpack(com.google.protobuf.Timestamp.class));
+        }
+        if (any.is(LocalTimeValue.class)) {
+            return LocalTime.parse(any.unpack(LocalTimeValue.class).getValue());
+        }
+        if (any.is(ClobValue.class)) {
+            return any.unpack(ClobValue.class).getValue();
+        }
+        if (any.is(BlobValue.class)) {
+            return any.unpack(BlobValue.class).getValue().toByteArray();
+        }
+        return JsonFormat.printer().includingDefaultValueFields().print(any);
+    }
+    
+    private static Timestamp converProtobufTimestamp(final com.google.protobuf.Timestamp timestamp) {
+        return new Timestamp(Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli());
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
new file mode 100644
index 00000000000..f7c81de30e5
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
+
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+
+import java.util.Collections;
+
+public final class Bootstrap {
+    
+    /**
+     * Main entrance.
+     *
+     * @param args args
+     */
+    public static void main(final String[] args) {
+        StartCDCClientParameter parameter = new StartCDCClientParameter();
+        parameter.setAddress("127.0.0.1");
+        parameter.setPort(33071);
+        parameter.setUsername("root");
+        parameter.setPassword("root");
+        parameter.setDatabase("sharding_db");
+        parameter.setSubscriptionMode(SubscriptionMode.FULL);
+        parameter.setSubscriptionName("subscribe_sharding_db");
+        parameter.setIncrementalGlobalOrderly(true);
+        parameter.setSubscribeTables(Collections.singletonList(TableName.newBuilder().setName("t_order").build()));
+        parameter.setDatabaseType("openGauss");
+        CDCClient cdcClient = new CDCClient(parameter);
+        cdcClient.start();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
new file mode 100644
index 00000000000..88f4616e123
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+url=127.0.0.1
+port=5432
+database=cdc_db
+username=gaussdb
+password=Root@123
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 5b7f7c08272..9b066b6dee8 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -117,7 +117,9 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
     
     @Override
     protected void doStop() {
-        importerConnector.clean();
-        CDCAckHolder.getInstance().cleanUp(this);
+        if (ImporterType.INCREMENTAL == importerType) {
+            importerConnector.clean(this);
+            CDCAckHolder.getInstance().cleanUp(this);
+        }
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index 257f2324d71..881629588ba 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -65,7 +65,7 @@ public final class CDCImporterConnector implements ImporterConnector {
     private final Condition condition = lock.newCondition();
     
     @Setter
-    private volatile boolean running = true;
+    private volatile boolean incrementalTaskRunning = true;
     
     @Getter
     private final String database;
@@ -127,10 +127,10 @@ public final class CDCImporterConnector implements ImporterConnector {
     }
     
     private void writeImmediately(final List<? extends Record> recordList, final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
-        while (!channel.isWritable() && channel.isActive() && running) {
+        while (!channel.isWritable() && channel.isActive()) {
             doAwait();
         }
-        if (!channel.isActive() || !running) {
+        if (!channel.isActive()) {
             return;
         }
         List<DataRecordResult.Record> records = new LinkedList<>();
@@ -189,10 +189,14 @@ public final class CDCImporterConnector implements ImporterConnector {
     
     /**
      * Clean CDC importer connector.
+     *
+     * @param cdcImporter CDC importer
      */
-    public void clean() {
-        running = false;
-        incrementalRecordMap.clear();
+    public void clean(final CDCImporter cdcImporter) {
+        incrementalRecordMap.remove(cdcImporter);
+        if (ImporterType.INCREMENTAL == cdcImporter.getImporterType()) {
+            incrementalTaskRunning = false;
+        }
     }
     
     @Override
@@ -207,7 +211,7 @@ public final class CDCImporterConnector implements ImporterConnector {
         
         @Override
         public void run() {
-            while (running) {
+            while (incrementalTaskRunning) {
                 Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
                 List<DataRecord> dataRecords = new LinkedList<>();
                 for (int i = 0; i < batchSize; i++) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index f3272de2ceb..6479e96d100 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -29,11 +29,11 @@ import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfigurat
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
@@ -71,7 +71,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     
     protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
         InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
-        return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+        return new CDCTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
     
     @Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
new file mode 100644
index 00000000000..1497458c92d
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+
+/**
+ * CDC tasks runner.
+ */
+public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
+    
+    public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+        super(jobItemContext, inventoryTasks, incrementalTasks);
+    }
+    
+    @Override
+    protected void inventorySuccessCallback() {
+        executeIncrementalTask();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
index 9108ed6af2e..4e85c3b2e14 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
@@ -27,7 +27,9 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,13 +46,18 @@ public final class DataRecordResultConvertUtil {
      * @return record
      */
     public static Record convertDataRecordToRecord(final String database, final String schema, final DataRecord dataRecord) {
-        Map<String, Any> beforeMap = new HashMap<>();
-        Map<String, Any> afterMap = new HashMap<>();
+        Map<String, Any> beforeMap = new LinkedHashMap<>();
+        Map<String, Any> afterMap = new LinkedHashMap<>();
+        List<String> uniqueKeyNames = new LinkedList<>();
         for (Column column : dataRecord.getColumns()) {
             beforeMap.put(column.getName(), Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getOldValue())));
             afterMap.put(column.getName(), Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getValue())));
+            if (column.isUniqueKey()) {
+                uniqueKeyNames.add(column.getName());
+            }
         }
-        TableMetaData metaData = TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName()).build();
+        TableMetaData metaData = TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName())
+                .addAllUniqueKeyNames(uniqueKeyNames).build();
         DataChangeType dataChangeType = DataChangeType.UNKNOWN;
         if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
             dataChangeType = DataChangeType.INSERT;
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index 49c82f09433..dd9a8158918 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -83,6 +83,7 @@ message DataRecordResult {
       string database = 1;
       optional string schema = 2;
       string table_name = 3;
+      repeated string unique_key_names = 4;
     }
     TableMetaData table_meta_data = 3;
     int64 transaction_commit_millis = 4;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 86b4fce8465..59b18b69389 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
+public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
     
     @Getter
     private final PipelineJobItemContext jobItemContext;
@@ -102,7 +102,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
         jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
     }
     
-    private synchronized void executeIncrementalTask() {
+    protected synchronized void executeIncrementalTask() {
         if (incrementalTasks.isEmpty()) {
             log.info("incrementalTasks empty, ignore");
             return;
@@ -122,16 +122,20 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
         ExecuteEngine.trigger(futures, new IncrementalExecuteCallback());
     }
     
+    protected void inventorySuccessCallback() {
+        if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+            log.info("onSuccess, all inventory tasks finished.");
+            executeIncrementalTask();
+        } else {
+            log.info("onSuccess, inventory tasks not finished");
+        }
+    }
+    
     private final class InventoryTaskExecuteCallback implements ExecuteCallback {
         
         @Override
         public void onSuccess() {
-            if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
-                log.info("onSuccess, all inventory tasks finished.");
-                executeIncrementalTask();
-            } else {
-                log.info("onSuccess, inventory tasks not finished");
-            }
+            inventorySuccessCallback();
         }
         
         @Override