You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/16 09:14:05 UTC

[shardingsphere] branch master updated: Enable CDC E2E IT (#24650)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new da27912ed3f Enable CDC E2E IT (#24650)
da27912ed3f is described below

commit da27912ed3fa851466cbe7638ed6886fa737ec48
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Mar 16 17:13:53 2023 +0800

    Enable CDC E2E IT (#24650)
    
    * Enable CDC E2E IT
    
    * Add struct convert and unit test
    
    * Extract batchInsertRecords method
---
 .../cdc/client/util/ProtobufAnyValueConverter.java |   5 +
 .../client/util/ProtobufAnyValueConverterTest.java |  75 ++++++++
 .../cdc/util/DataRecordResultConvertUtilTest.java  |   1 +
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  13 +-
 .../cases/cdc/DataSourceRecordConsumer.java        | 199 +++++++++++++++++++++
 .../pipeline/cases/task/E2EIncrementalTask.java    |  38 +---
 .../e2e/data/pipeline/util/SQLBuilderUtil.java     |  77 ++++++++
 7 files changed, 372 insertions(+), 36 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
index c795a88f328..cd5048d4427 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
@@ -27,8 +27,10 @@ import com.google.protobuf.Int32Value;
 import com.google.protobuf.Int64Value;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
 import com.google.protobuf.UInt32Value;
 import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Timestamp;
@@ -83,6 +85,9 @@ public final class ProtobufAnyValueConverter {
         if (any.is(com.google.protobuf.Timestamp.class)) {
             return converProtobufTimestamp(any.unpack(com.google.protobuf.Timestamp.class));
         }
+        if (any.is(Struct.class)) {
+            return JsonFormat.printer().print(any.unpack(Struct.class));
+        }
         // TODO can't use JsonFormat, might change the original value without error prompt. there need to cover more types,
         log.error("not support unpack value={}", any);
         throw new UnsupportedOperationException(String.format("not support unpack the type %s", any.getTypeUrl()));
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
new file mode 100644
index 00000000000..ac11139c4aa
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
+import com.google.protobuf.NullValue;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.Value;
+import com.google.protobuf.util.JsonFormat;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.OffsetDateTime;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class ProtobufAnyValueConverterTest {
+    
+    @Test
+    public void assertConvertToObject() throws InvalidProtocolBufferException {
+        Object actual = ProtobufAnyValueConverter.convertToObject(Any.pack(Int32Value.of(123)));
+        assertThat(actual, is(123));
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(Int64Value.of(Long.MAX_VALUE)));
+        assertThat(actual, is(Long.MAX_VALUE));
+        OffsetDateTime now = OffsetDateTime.now();
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(com.google.protobuf.Timestamp.newBuilder().setSeconds(now.toEpochSecond()).setNanos(now.getNano()).build()));
+        assertThat(actual, is(Timestamp.valueOf(now.toLocalDateTime())));
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(FloatValue.of(1.23F)));
+        assertThat(actual, is(1.23F));
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(DoubleValue.of(4.56D)));
+        assertThat(actual, is(4.56D));
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(StringValue.of("Hello")));
+        assertThat(actual, is("Hello"));
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(BoolValue.of(true)));
+        assertTrue((Boolean) actual);
+        actual = ProtobufAnyValueConverter.convertToObject(Any.pack(BytesValue.of(ByteString.copyFrom(new byte[]{1, 2, 3}))));
+        assertThat(actual, is(new byte[]{1, 2, 3}));
+        actual = Struct.newBuilder().putFields("str", Value.newBuilder().setStringValue("ABC defg").build())
+                .putFields("null", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
+                .putFields("number", Value.newBuilder().setNumberValue(123.45D).build())
+                .putFields("list", Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setNumberValue(1)).build()).build()).build();
+        Builder expected = Struct.newBuilder();
+        JsonFormat.parser().merge((String) ProtobufAnyValueConverter.convertToObject(Any.pack((Struct) actual)), expected);
+        assertEquals(actual.toString(), expected.toString());
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
index b351c9e485f..39d418f226e 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
@@ -64,6 +64,7 @@ public final class DataRecordResultConvertUtilTest {
         dataRecord.addColumn(new Column("create_time2", OffsetTime.now(), false, false));
         dataRecord.addColumn(new Column("create_datetime", LocalDateTime.now(), false, false));
         dataRecord.addColumn(new Column("create_datetime2", OffsetDateTime.now(), false, false));
+        dataRecord.addColumn(new Column("empty", null, false, false));
         Blob mockedBlob = mock(Blob.class);
         when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new byte[]{-1, 0, 1});
         dataRecord.addColumn(new Column("data_blob", mockedBlob, false, false));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 8c34fef523a..a70dcb628f5 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,12 +35,15 @@ import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTabl
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
@@ -164,8 +167,10 @@ public final class CDCE2EIT {
     }
     
     private void startCDCClient(final PipelineContainerComposer containerComposer) {
-        // TODO fix later
-        StartCDCClientParameter parameter = new StartCDCClientParameter(records -> log.info("records: {}", records));
+        DataSource dataSource = StorageContainerUtil.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false)),
+                containerComposer.getUsername(), containerComposer.getPassword());
+        DataSourceRecordConsumer dataSourceRecordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
+        StartCDCClientParameter parameter = new StartCDCClientParameter(dataSourceRecordConsumer);
         parameter.setAddress("localhost");
         parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
         parameter.setUsername(ProxyContainerConstants.USERNAME);
@@ -196,8 +201,6 @@ public final class CDCE2EIT {
     }
     
     private static boolean isEnabled() {
-        // TODO xinze fix it
-        // return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
-        return false;
+        return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
new file mode 100644
index 00000000000..6fcfef5607a
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.cdc;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ProtobufAnyValueConverter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtil;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
+    
+    private final DataSource dataSource;
+    
+    private final Map<String, PipelineTableMetaData> tableMetaDataMap;
+    
+    private final StandardPipelineTableMetaDataLoader loader;
+    
+    public DataSourceRecordConsumer(final DataSource dataSource, final DatabaseType databaseType) {
+        this.dataSource = dataSource;
+        tableMetaDataMap = new ConcurrentHashMap<>();
+        loader = new StandardPipelineTableMetaDataLoader(new PipelineDataSourceWrapper(dataSource, databaseType));
+    }
+    
+    @Override
+    public void accept(final List<Record> records) {
+        long insertCount = records.stream().filter(each -> DataChangeType.INSERT == each.getDataChangeType()).count();
+        if (insertCount == records.size()) {
+            batchInsertRecords(records);
+            return;
+        }
+        for (Record record : records) {
+            write(record);
+        }
+    }
+    
+    private void batchInsertRecords(final List<Record> records) {
+        Record firstRecord = records.get(0);
+        MetaData metaData = firstRecord.getMetaData();
+        PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
+        List<String> columnNames = firstRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+        String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
+        String insertSQL = SQLBuilderUtil.buildInsertSQL(columnNames, tableName);
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+            for (Record each : records) {
+                List<TableColumn> tableColumns = each.getAfterList();
+                for (int i = 0; i < tableColumns.size(); i++) {
+                    preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumns.get(i)));
+                }
+                preparedStatement.addBatch();
+            }
+            preparedStatement.executeBatch();
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    private void write(final Record record) {
+        String sql = buildSQL(record);
+        MetaData metaData = record.getMetaData();
+        PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+            Map<String, TableColumn> afterMap = new LinkedHashMap<>(record.getBeforeList().size(), 1);
+            record.getAfterList().forEach(each -> afterMap.put(each.getName(), each));
+            switch (record.getDataChangeType()) {
+                case INSERT:
+                    for (int i = 0; i < record.getAfterCount(); i++) {
+                        TableColumn tableColumn = record.getAfterList().get(i);
+                        preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumn));
+                    }
+                    break;
+                case UPDATE:
+                    for (int i = 0; i < record.getAfterCount(); i++) {
+                        TableColumn tableColumn = record.getAfterList().get(i);
+                        preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumn));
+                    }
+                    preparedStatement.setObject(record.getAfterCount() + 1, convertValueFromAny(tableMetaData, afterMap.get("order_id")));
+                    int updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount) {
+                        log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}", updateCount, sql, afterMap.keySet());
+                    }
+                    break;
+                case DELETE:
+                    Object orderId = convertValueFromAny(tableMetaData, afterMap.get("order_id"));
+                    preparedStatement.setObject(1, orderId);
+                    preparedStatement.execute();
+                    break;
+                default:
+            }
+            preparedStatement.execute();
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    private PipelineTableMetaData loadTableMetaData(final String schemaName, final String tableName) {
+        PipelineTableMetaData result = tableMetaDataMap.get(buildTableNameWithSchema(schemaName, tableName));
+        if (null != result) {
+            return result;
+        }
+        result = loader.getTableMetaData(Strings.emptyToNull(schemaName), tableName);
+        tableMetaDataMap.put(buildTableNameWithSchema(schemaName, tableName), result);
+        return result;
+    }
+    
+    private String buildTableNameWithSchema(final String schema, final String tableName) {
+        return schema.isEmpty() ? tableName : String.join(".", schema, tableName);
+    }
+    
+    private String buildSQL(final Record record) {
+        List<String> columnNames = record.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+        MetaData metaData = record.getMetaData();
+        String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
+        switch (record.getDataChangeType()) {
+            case INSERT:
+                return SQLBuilderUtil.buildInsertSQL(columnNames, tableName);
+            case UPDATE:
+                return SQLBuilderUtil.buildUpdateSQL(columnNames, tableName, "?");
+            case DELETE:
+                return SQLBuilderUtil.buildDeleteSQL(tableName, "order_id");
+            default:
+                throw new UnsupportedOperationException();
+        }
+    }
+    
+    private Object convertValueFromAny(final PipelineTableMetaData tableMetaData, final TableColumn tableColumn) {
+        PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(tableColumn.getName());
+        Object result;
+        try {
+            result = ProtobufAnyValueConverter.convertToObject(tableColumn.getValue());
+        } catch (final InvalidProtocolBufferException ex) {
+            log.error("invalid protocol message value: {}", tableColumn.getValue());
+            throw new RuntimeException(ex);
+        }
+        if (null == result) {
+            return null;
+        }
+        switch (columnMetaData.getDataType()) {
+            case Types.TIME:
+                if ("TIME".equalsIgnoreCase(columnMetaData.getDataTypeName())) {
+                    // Time.valueOf() will lose nanos
+                    return LocalTime.ofNanoOfDay((Long) result);
+                }
+                return result;
+            case Types.DATE:
+                if ("DATE".equalsIgnoreCase(columnMetaData.getDataTypeName())) {
+                    LocalDate localDate = LocalDate.ofEpochDay((Long) result);
+                    return Date.valueOf(localDate);
+                }
+                return result;
+            default:
+                return result;
+        }
+    }
+}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index eadb185cdf7..e8a67c75c5f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtil;
 
 import javax.sql.DataSource;
 import java.math.BigDecimal;
@@ -87,35 +88,20 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
     private void insertOrder(final Object[] orderInsertData) {
         String sql;
         if (databaseType instanceof MySQLDatabaseType) {
-            sql = buildInsertSQL(MYSQL_COLUMN_NAMES);
+            sql = SQLBuilderUtil.buildInsertSQL(MYSQL_COLUMN_NAMES, orderTableName);
         } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
-            sql = buildInsertSQL(POSTGRESQL_COLUMN_NAMES);
+            sql = SQLBuilderUtil.buildInsertSQL(POSTGRESQL_COLUMN_NAMES, orderTableName);
         } else {
             throw new UnsupportedOperationException();
         }
         DataSourceExecuteUtil.execute(dataSource, sql, orderInsertData);
     }
     
-    private String buildInsertSQL(final List<String> columnNames) {
-        StringBuilder sql = new StringBuilder("INSERT INTO %s (");
-        for (String each : columnNames) {
-            sql.append(each).append(",");
-        }
-        sql.setLength(sql.length() - 1);
-        sql.append(") ").append("VALUES").append("(");
-        for (int i = 0; i < columnNames.size(); i++) {
-            sql.append("?,");
-        }
-        sql.setLength(sql.length() - 1);
-        sql.append(")");
-        return String.format(sql.toString(), orderTableName);
-    }
-    
     private void updateOrderById(final Object orderId) {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int randomInt = random.nextInt(-100, 100);
         if (databaseType instanceof MySQLDatabaseType) {
-            String sql = String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), "?"), orderTableName);
+            String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "?");
             log.info("update sql: {}", sql);
             int randomUnsignedInt = random.nextInt(10, 100);
             LocalDateTime now = LocalDateTime.now();
@@ -125,7 +111,7 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
             return;
         }
         if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
-            String sql = String.format(buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), "?"), orderTableName);
+            String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
             log.info("update sql: {}", sql);
             DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
                     PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
@@ -133,28 +119,18 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
         }
     }
     
-    private String buildUpdateSQL(final List<String> columnNames, final String placeholder) {
-        StringBuilder sql = new StringBuilder("UPDATE %s SET ");
-        for (String each : columnNames) {
-            sql.append(each).append("=").append(placeholder).append(",");
-        }
-        sql.setLength(sql.length() - 1);
-        sql.append(" WHERE order_id=?");
-        return sql.toString();
-    }
-    
     private List<String> ignoreShardingColumns(final List<String> columnNames) {
         return new ArrayList<>(columnNames.subList(2, columnNames.size()));
     }
     
     private void deleteOrderById(final Object orderId) {
-        String sql = String.format("DELETE FROM %s WHERE order_id = ?", orderTableName);
+        String sql = SQLBuilderUtil.buildDeleteSQL(orderTableName, "order_id");
         DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
     }
     
     private void setNullToAllFields(final Object orderId) {
         if (databaseType instanceof MySQLDatabaseType) {
-            String sql = String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), "null"), orderTableName);
+            String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "null");
             DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
         }
     }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java
new file mode 100644
index 00000000000..11cda8f7661
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.util;
+
+import java.util.List;
+
+/**
+ * SQL builder util.
+ */
+public final class SQLBuilderUtil {
+    
+    /**
+     * Build insert SQL.
+     *
+     * @param columnNames column names
+     * @param tableName table name
+     * @return insert SQL
+     */
+    public static String buildInsertSQL(final List<String> columnNames, final String tableName) {
+        StringBuilder result = new StringBuilder("INSERT INTO %s (");
+        for (String each : columnNames) {
+            result.append(each).append(",");
+        }
+        result.setLength(result.length() - 1);
+        result.append(") ").append("VALUES").append("(");
+        for (int i = 0; i < columnNames.size(); i++) {
+            result.append("?,");
+        }
+        result.setLength(result.length() - 1);
+        result.append(")");
+        return String.format(result.toString(), tableName);
+    }
+    
+    /**
+     * Build update SQL.
+     *
+     * @param columnNames column names
+     * @param tableName table name
+     * @param placeholder placeholder
+     * @return update SQL
+     */
+    public static String buildUpdateSQL(final List<String> columnNames, final String tableName, final String placeholder) {
+        StringBuilder result = new StringBuilder("UPDATE %s SET ");
+        for (String each : columnNames) {
+            result.append(each).append("=").append(placeholder).append(",");
+        }
+        result.setLength(result.length() - 1);
+        result.append(" WHERE order_id=?");
+        return String.format(result.toString(), tableName);
+    }
+    
+    /**
+     * Build delete SQL.
+     *
+     * @param tableName table name
+     * @param primaryKeyName primary key name
+     * @return delete SQL
+     */
+    public static String buildDeleteSQL(final String tableName, final String primaryKeyName) {
+        return String.format("DELETE FROM %s WHERE %s = ?", tableName, primaryKeyName);
+    }
+}