You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/15 02:24:05 UTC

[shardingsphere] branch master updated: Add test fields for openGauss at pipeline E2E (#25608)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d67ba69b5b4 Add test fields for openGauss at pipeline E2E (#25608)
d67ba69b5b4 is described below

commit d67ba69b5b4e38af3fbd7c0c849c875ee3d97f07
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon May 15 10:23:57 2023 +0800

    Add test fields for openGauss at pipeline E2E (#25608)
    
    * Add test fields for openGauss/PostgreSQL at pipeline E2E
    
    * Add jdbcUrl parameter for fix bit(n) column migration
    
    * Data match add array equals method
    
    * Improve column names
---
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  3 +
 .../OpenGaussJdbcQueryPropertiesExtension.java     |  2 +
 .../ingest/wal/OpenGaussColumnValueReader.java}    | 48 ++++++-------
 ...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 +++++
 .../OpenGaussJdbcQueryPropertiesExtensionTest.java |  2 +-
 .../ingest/PostgreSQLColumnValueReader.java        |  9 ---
 .../pipeline/cases/PipelineContainerComposer.java  | 54 +++++++-------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  6 +-
 .../cases/migration/AbstractMigrationE2EIT.java    |  8 +--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  8 +--
 .../pipeline/cases/task/E2EIncrementalTask.java    | 29 ++++++--
 .../framework/helper/PipelineCaseHelper.java       | 22 +++++-
 .../resources/env/scenario/general/opengauss.xml   | 82 ++++++++++++++++++++++
 13 files changed, 210 insertions(+), 81 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index c6ea5031b44..01b30ae060f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.math.BigDecimal;
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -296,6 +297,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                         matched = ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
                     } else if (thisResult instanceof BigDecimal && thatResult instanceof BigDecimal) {
                         matched = DataConsistencyCheckUtils.isBigDecimalEquals((BigDecimal) thisResult, (BigDecimal) thatResult);
+                    } else if (thisResult instanceof Array && thatResult instanceof Array) {
+                        matched = Objects.deepEquals(((Array) thisResult).getArray(), ((Array) thatResult).getArray());
                     } else {
                         matched = equalsBuilder.append(thisResult, thatResult).isEquals();
                     }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
index c2952349a32..616631728d9 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
@@ -30,6 +30,8 @@ public final class OpenGaussJdbcQueryPropertiesExtension implements JdbcQueryPro
     
     @Override
     public Properties extendQueryProperties() {
+        queryProps.setProperty("stringtype", "unspecified");
+        queryProps.setProperty("bitToString", "true");
         return queryProps;
     }
     
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
similarity index 54%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
copy to kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
index a84844018bb..b299be37ac0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
@@ -15,62 +15,58 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
 
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractColumnValueReader;
-import org.postgresql.util.PGobject;
 
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
- * Column value reader for PostgreSQL.
+ * Column value reader for openGauss.
  */
-public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader {
+public final class OpenGaussColumnValueReader extends AbstractColumnValueReader {
     
-    private static final Collection<String> TYPE_ALIASES = Collections.singletonList("openGauss");
+    private static final String MONEY_TYPE = "money";
     
-    private static final String PG_MONEY_TYPE = "money";
+    private static final String BIT_TYPE = "bit";
     
-    private static final String PG_BIT_TYPE = "bit";
+    private static final String BOOL_TYPE = "bool";
     
     @Override
     protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
-        if (isPgMoneyType(metaData, columnIndex)) {
+        if (isMoneyType(metaData, columnIndex)) {
             return resultSet.getBigDecimal(columnIndex);
         }
-        if (isPgBitType(metaData, columnIndex)) {
-            PGobject result = new PGobject();
-            result.setType("bit");
-            Object bitValue = resultSet.getObject(columnIndex);
-            result.setValue(null == bitValue ? null : (Boolean) bitValue ? "1" : "0");
-            return result;
+        if (isBitType(metaData, columnIndex)) {
+            // openGauss JDBC driver can't parse bit(n) correctly when n > 1, so JDBC url already add bitToString, there just return string
+            return resultSet.getString(columnIndex);
+        }
+        if (isBoolType(metaData, columnIndex)) {
+            return resultSet.getBoolean(columnIndex);
         }
         return super.defaultDoReadValue(resultSet, metaData, columnIndex);
     }
     
-    private boolean isPgMoneyType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
-        return PG_MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    private boolean isMoneyType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+        return MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+    }
+    
+    private boolean isBoolType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+        return BOOL_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
     }
     
-    private boolean isPgBitType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+    private boolean isBitType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
         if (Types.BIT == resultSetMetaData.getColumnType(index)) {
-            return PG_BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+            return BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
         }
         return false;
     }
     
     @Override
     public String getType() {
-        return "PostgreSQL";
-    }
-    
-    @Override
-    public Collection<String> getTypeAliases() {
-        return TYPE_ALIASES;
+        return "openGauss";
     }
 }
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
new file mode 100644
index 00000000000..c2616b6fa02
--- /dev/null
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussColumnValueReader
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
index 97a7227bd65..9968d05953d 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
@@ -40,6 +40,6 @@ class OpenGaussJdbcQueryPropertiesExtensionTest {
     private void assertExtension(final JdbcQueryPropertiesExtension actual) {
         assertThat(actual, instanceOf(OpenGaussJdbcQueryPropertiesExtension.class));
         assertThat(actual.getType(), equalTo("openGauss"));
-        assertTrue(actual.extendQueryProperties().isEmpty());
+        assertThat(actual.extendQueryProperties().size(), equalTo(2));
     }
 }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index a84844018bb..0cfdbdb6cdb 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -24,16 +24,12 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
  * Column value reader for PostgreSQL.
  */
 public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader {
     
-    private static final Collection<String> TYPE_ALIASES = Collections.singletonList("openGauss");
-    
     private static final String PG_MONEY_TYPE = "money";
     
     private static final String PG_BIT_TYPE = "bit";
@@ -68,9 +64,4 @@ public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader
     public String getType() {
         return "PostgreSQL";
     }
-    
-    @Override
-    public Collection<String> getTypeAliases() {
-        return TYPE_ALIASES;
-    }
 }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 19b903cf565..99a8fb2fe11 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -123,7 +123,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
         }
         extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
         containerComposer.start();
-        sourceDataSource = StorageContainerUtils.generateDataSource(appendExtraParameter(getActualJdbcUrlTemplate(DS_0, false)), username, password);
+        sourceDataSource = StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
         proxyDataSource = StorageContainerUtils.generateDataSource(
                 appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
         init(jobType);
@@ -198,7 +198,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Append extra parameter.
-     * 
+     *
      * @param jdbcUrl JDBC URL
      * @return appended JDBC URL
      */
@@ -207,14 +207,14 @@ public final class PipelineContainerComposer implements AutoCloseable {
             return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
         }
         if (DatabaseTypeUtils.isPostgreSQL(databaseType) || DatabaseTypeUtils.isOpenGauss(databaseType)) {
-            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified")));
+            return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"), new Property("bitToString", Boolean.TRUE.toString())));
         }
         return jdbcUrl;
     }
     
     /**
      * Register storage unit.
-     * 
+     *
      * @param storageUnitName storage unit name
      * @throws SQLException SQL exception
      */
@@ -222,13 +222,13 @@ public final class PipelineContainerComposer implements AutoCloseable {
         String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", storageUnitName)
                 .replace("${user}", getUsername())
                 .replace("${password}", getPassword())
-                .replace("${url}", appendExtraParameter(getActualJdbcUrlTemplate(storageUnitName, true)));
+                .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, true));
         proxyExecuteWithLog(registerStorageUnitTemplate, 2);
     }
     
     /**
      * Add resource.
-     * 
+     *
      * @param distSQL dist SQL
      * @throws SQLException SQL exception
      */
@@ -239,7 +239,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Get actual JDBC URL template.
-     * 
+     *
      * @param databaseName database name
      * @param isInContainer is in container
      * @param storageContainerIndex storage container index
@@ -257,18 +257,18 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Get actual JDBC URL template.
-     * 
+     *
      * @param databaseName database name
      * @param isInContainer is in container
      * @return actual JDBC URL template
      */
     public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
-        return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
+        return appendExtraParameter(getActualJdbcUrlTemplate(databaseName, isInContainer, 0));
     }
     
     /**
      * Create schema.
-     * 
+     *
      * @param connection connection
      * @param sleepSeconds sleep seconds
      * @throws SQLException SQL exception
@@ -286,8 +286,8 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Create source order table.
-     * 
-     * @param sourceTableName source table name 
+     *
+     * @param sourceTableName source table name
      * @throws SQLException SQL exception
      */
     public void createSourceOrderTable(final String sourceTableName) throws SQLException {
@@ -296,7 +296,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Create source table index list.
-     * 
+     *
      * @param schema schema
      * @param sourceTableName source table name
      * @throws SQLException SQL exception
@@ -311,7 +311,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Create source comment on list.
-     * 
+     *
      * @param schema schema
      * @param sourceTableName source table name
      * @throws SQLException SQL exception
@@ -322,7 +322,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Create source order item table.
-     * 
+     *
      * @throws SQLException SQL exception
      */
     public void createSourceOrderItemTable() throws SQLException {
@@ -331,7 +331,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Source execute with log.
-     * 
+     *
      * @param sql SQL
      * @throws SQLException SQL exception
      */
@@ -344,7 +344,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Proxy execute with log.
-     * 
+     *
      * @param sql SQL
      * @param sleepSeconds sleep seconds
      * @throws SQLException SQL exception
@@ -360,7 +360,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Wait job prepare success.
-     * 
+     *
      * @param distSQL dist SQL
      */
     @SneakyThrows(InterruptedException.class)
@@ -376,7 +376,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Query for list with log.
-     * 
+     *
      * @param sql SQL
      * @return query result
      * @throws RuntimeException runtime exception
@@ -399,7 +399,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Transform result set to list.
-     * 
+     *
      * @param resultSet result set
      * @return transformed result
      * @throws SQLException SQL exception
@@ -420,7 +420,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Start increment task.
-     * 
+     *
      * @param baseIncrementTask base increment task
      */
     public void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
@@ -430,7 +430,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Wait increment task finished.
-     * 
+     *
      * @param distSQL dist SQL
      * @return result
      * @throws InterruptedException interrupted exception
@@ -464,7 +464,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Assert proxy order record exist.
-     * 
+     *
      * @param tableName table name
      * @param orderId order id
      */
@@ -480,7 +480,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Assert proxy order record exist.
-     * 
+     *
      * @param sql SQL
      */
     @SneakyThrows(InterruptedException.class)
@@ -499,7 +499,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Get target table records count.
-     * 
+     *
      * @param tableName table name
      * @return target table records count
      */
@@ -511,7 +511,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Assert greater than order table init rows.
-     * 
+     *
      * @param tableInitRows table init rows
      * @param schema schema
      */
@@ -523,7 +523,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
     
     /**
      * Generate ShardingSphere data source from proxy.
-     * 
+     *
      * @return ShardingSphere data source
      * @throws SQLException SQL exception
      */
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 133bd0e8495..1967d7d97be 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -79,7 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/general/mysql.xml"),
-        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/opengauss.xml")})
 @Slf4j
 class CDCE2EIT {
     
@@ -108,7 +108,7 @@ class CDCE2EIT {
             }
             createOrderTableRule(containerComposer);
             try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
-                initSchemaAndTable(containerComposer, connection, 2);
+                initSchemaAndTable(containerComposer, connection, 3);
             }
             DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
             Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -170,7 +170,7 @@ class CDCE2EIT {
     }
     
     private void startCDCClient(final PipelineContainerComposer containerComposer) {
-        DataSource dataSource = StorageContainerUtils.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false)),
+        DataSource dataSource = StorageContainerUtils.generateDataSource(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
                 containerComposer.getUsername(), containerComposer.getPassword());
         StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("localhost");
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a3fa80535d2..60d7960e9ed 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -61,16 +61,16 @@ public abstract class AbstractMigrationE2EIT {
         }
         String addSourceResource = migrationDistSQL.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds0}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
+                .replace("${ds0}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true));
         containerComposer.addResource(addSourceResource);
     }
     
     protected void addMigrationTargetResource(final PipelineContainerComposer containerComposer) throws SQLException {
         String addTargetResource = migrationDistSQL.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
                 .replace("${password}", containerComposer.getPassword())
-                .replace("${ds2}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
-                .replace("${ds3}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
-                .replace("${ds4}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
+                .replace("${ds2}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true))
+                .replace("${ds3}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true))
+                .replace("${ds4}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true));
         containerComposer.addResource(addTargetResource);
         List<Map<String, Object>> resources = containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
         assertThat(resources.size(), is(3));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index f518c43e58d..51fff681a5c 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -47,11 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @PipelineE2ESettings(database = {
         @PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/general/postgresql.xml"),
-        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
+        @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/opengauss.xml")})
 @Slf4j
 class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     
-    private static final String SOURCE_TABLE_NAME = "t_order_copy";
+    private static final String SOURCE_TABLE_NAME = "t_order";
     
     private static final String TARGET_TABLE_NAME = "t_order";
     
@@ -97,12 +97,12 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
     private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException, InterruptedException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
+        // must refresh firstly, otherwise proxy can't get schema and table info
+        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
         containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
                 String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
         startMigrationByJobId(containerComposer, jobId);
-        // must refresh firstly, otherwise proxy can't get schema and table info
-        containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
         Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
                 String.format("SELECT * FROM %s WHERE order_id = %s", String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId)).isEmpty());
         containerComposer.assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 0e211b4e7e6..72049d85340 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 @RequiredArgsConstructor
@@ -53,6 +56,10 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
     private static final List<String> POSTGRESQL_COLUMN_NAMES = Arrays.asList("order_id", "user_id", "status", "t_int2", "t_numeric", "t_bool", "t_bytea", "t_char", "t_varchar", "t_float",
             "t_double", "t_json", "t_jsonb", "t_text", "t_date", "t_time", "t_timestamp", "t_timestamptz");
     
+    private static final List<String> OPENGAUSS_COLUMN_NAMES = Arrays.asList("order_id", "user_id", "status", "c_int", "c_smallint", "c_float", "c_double", "c_numeric", "c_boolean", "c_char",
+            "c_text", "c_bytea", "c_date", "c_time", "c_smalldatetime", "c_timestamp", "c_timestamptz", "c_interval", "c_array", "c_json", "c_jsonb", "c_uuid", "c_hash32", "c_tsvector", "c_bit",
+            "c_int4range", "c_reltime", "c_abstime", "c_point", "c_lseg", "c_box", "c_circle", "c_bitvarying", "c_cidr", "c_inet", "c_macaddr", "c_hll");
+    
     private final DataSource dataSource;
     
     private final String orderTableName;
@@ -88,8 +95,10 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
         String sql;
         if (databaseType instanceof MySQLDatabaseType) {
             sql = SQLBuilderUtils.buildInsertSQL(MYSQL_COLUMN_NAMES, orderTableName);
-        } else if (databaseType instanceof SchemaSupportedDatabaseType) {
+        } else if (databaseType instanceof PostgreSQLDatabaseType) {
             sql = SQLBuilderUtils.buildInsertSQL(POSTGRESQL_COLUMN_NAMES, orderTableName);
+        } else if (databaseType instanceof OpenGaussDatabaseType) {
+            sql = SQLBuilderUtils.buildInsertSQL(OPENGAUSS_COLUMN_NAMES, orderTableName);
         } else {
             throw new UnsupportedOperationException();
         }
@@ -110,13 +119,25 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
             DataSourceExecuteUtils.execute(dataSource, sql, parameters);
             return;
         }
-        if (databaseType instanceof SchemaSupportedDatabaseType) {
+        if (databaseType instanceof PostgreSQLDatabaseType) {
             String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
-            Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
+            Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), random.nextBoolean(), new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
                     PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
                     LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId};
             log.info("update sql: {}, params: {}", sql, parameters);
             DataSourceExecuteUtils.execute(dataSource, sql, parameters);
+            return;
+        }
+        if (databaseType instanceof OpenGaussDatabaseType) {
+            LocalDateTime now = LocalDateTime.now();
+            String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(OPENGAUSS_COLUMN_NAMES), orderTableName, "?");
+            Object[] parameters = {"中文测试", randomInt, random.nextInt(-999, 999), PipelineCaseHelper.generateFloat(), PipelineCaseHelper.generateDouble(), BigDecimal.valueOf(10000),
+                    random.nextBoolean(), "update-char", "update-text", "update-bytea".getBytes(), now.toLocalDate().plusDays(1), now.toLocalTime().plusHours(6), "2023-03-01", now,
+                    OffsetDateTime.now(), "1 years 1 mons 1 days 1 hours 1 mins 1 secs", "{4, 5, 6}", PipelineCaseHelper.generateJsonString(1, true), PipelineCaseHelper.generateJsonString(1, false),
+                    UUID.randomUUID().toString(), DigestUtils.md5Hex(now.toString()), null, "1111", "[1,10000)", "2 years 2 mons 2 days 06:00:00", "2023-01-01 00:00:00+00", "(2.0,2.0)",
+                    "[(0.0,0.0),(3.0,3.0)]", "(1.0,1.0),(3.0,3.0)", "<(5.0,5.0),1.0>", "1010", "192.168.0.0/24", "192.168.1.1", "08:00:3b:01:02:03", null, orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
         }
     }
     
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index c6b9cf1ec82..04a3400cb46 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -21,11 +21,13 @@ import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 
@@ -41,6 +43,7 @@ import java.time.OffsetDateTime;
 import java.time.Year;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -91,7 +94,7 @@ public final class PipelineCaseHelper {
             }
             return result;
         }
-        if (databaseType instanceof SchemaSupportedDatabaseType) {
+        if (databaseType instanceof PostgreSQLDatabaseType) {
             for (int i = 0; i < insertRows; i++) {
                 Object orderId = keyGenerateAlgorithm.generateKey();
                 result.add(new Object[]{orderId, generateInt(0, 100), generateString(6), generateInt(-128, 127),
@@ -101,6 +104,19 @@ public final class PipelineCaseHelper {
             }
             return result;
         }
+        if (databaseType instanceof OpenGaussDatabaseType) {
+            for (int i = 0; i < insertRows; i++) {
+                Object orderId = keyGenerateAlgorithm.generateKey();
+                // TODO openGauss mpp plugin parses single quotes incorrectly
+                result.add(new Object[]{orderId, generateInt(0, 1000), "status" + i, generateInt(-1000, 9999), generateInt(0, 100), generateFloat(), generateDouble(),
+                        BigDecimal.valueOf(generateDouble()), false, generateString(6), "texts", "bytea".getBytes(), LocalDate.now(), LocalTime.now(), "2001-10-01",
+                        Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), "0 years 0 mons 1 days 2 hours 3 mins 4 secs", "{1, 2, 3}", generateJsonString(8, false),
+                        generateJsonString(8, true), UUID.randomUUID().toString(), DigestUtils.md5Hex(orderId.toString()), null, "0000", "[1,1000)",
+                        "1 years 1 mons 10 days -06:00:00", "2000-01-02 00:00:00+00", "(1.0,1.0)", "[(0.0,0.0),(2.0,2.0)]", "(3.0,3.0),(1.0,1.0)", "<(5.0,5.0),5.0>", "1111",
+                        "192.168.0.0/16", "192.168.1.1", "08:00:2b:01:02:03", "\\x484c4c00000000002b05000000000000000000000000000000000000"});
+            }
+            return result;
+        }
         throw new UnsupportedOperationException("now support generate %s insert data");
     }
     
@@ -109,7 +125,7 @@ public final class PipelineCaseHelper {
     }
     
     private static String generateString(final int strLength) {
-        return RandomStringUtils.randomAlphabetic(strLength);
+        return RandomStringUtils.randomAlphanumeric(strLength);
     }
     
     /**
diff --git a/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
new file mode 100644
index 00000000000..c7da6ef3ab4
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
@@ -0,0 +1,82 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<command>
+    <create-table-order>
+        create table test.t_order (
+        order_id bigint,
+        user_id integer,
+        status character varying(50),
+        c_int integer,
+        c_smallint smallint,
+        c_float real,
+        c_double double precision,
+        c_numeric numeric(10,2),
+        c_boolean boolean,
+        c_char character(32),
+        c_text text,
+        c_bytea bytea,
+        c_date date,
+        c_time time without time zone,
+        c_smalldatetime smalldatetime,
+        c_timestamp timestamp without time zone,
+        c_timestamptz timestamp with time zone,
+        c_interval interval,
+        c_array integer[],
+        c_json json,
+        c_jsonb jsonb,
+        c_uuid uuid,
+        c_hash32 hash32,
+        c_tsvector tsvector,
+        c_bit bit(4),
+        c_int4range int4range,
+        c_reltime reltime,
+        c_abstime abstime,
+        c_point point,
+        c_lseg lseg,
+        c_box box,
+        c_circle circle,
+        c_bitvarying bit varying(32),
+        c_cidr cidr,
+        c_inet inet,
+        c_macaddr macaddr,
+        c_hll hll(14,10,12,0),
+        PRIMARY KEY ( order_id )
+        );
+    </create-table-order>
+
+    <full-insert-order>
+        INSERT INTO test.t_order (
+        order_id, user_id, status, c_int, c_smallint, c_float, c_double, c_numeric, c_boolean, c_char, c_text, c_bytea, c_date, c_time,
+        c_smalldatetime, c_timestamp, c_timestamptz, c_interval, c_array, c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_bit,
+        c_int4range, c_reltime, c_abstime, c_point, c_lseg, c_box, c_circle, c_bitvarying, c_cidr, c_inet, c_macaddr, c_hll
+        ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
+    </full-insert-order>
+
+    <create-table-order-item>
+        CREATE TABLE test.t_order_item (
+        item_id int8 NOT NULL,
+        order_id int8 NOT NULL,
+        user_id int4 NOT NULL,
+        status varchar(50),
+        PRIMARY KEY (item_id)
+        )
+    </create-table-order-item>
+
+    <full-insert-order-item>
+        INSERT INTO test.t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)
+    </full-insert-order-item>
+</command>