You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/26 07:29:03 UTC

[shardingsphere] branch master updated: Refactor data record merge at pipeline (#25307)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7195cdf91a4 Refactor data record merge at pipeline (#25307)
7195cdf91a4 is described below

commit 7195cdf91a445d9de521328d27d1a289a05669cf
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Apr 26 15:28:46 2023 +0800

    Refactor data record merge at pipeline (#25307)
    
    * Refactor data record merge at pipeline
    
    * Combine DataRecord merge and group
    
    * Combine DataRecord merge and group
    
    * improve error reason
---
 .../pipeline/api/ingest/record/DataRecord.java     |   2 +-
 .../api/ingest/record/GroupedDataRecord.java       |   8 +-
 .../job/PipelineImporterJobWriteException.java     |   4 +
 .../pipeline/core/importer/DataRecordMerger.java   | 156 +++++-----------
 .../pipeline/core/importer/DataSourceImporter.java |  73 ++++++--
 .../core/importer/DataRecordMergerTest.java        | 202 +++++++--------------
 .../pipeline/cases/task/E2EIncrementalTask.java    |  15 +-
 .../core/importer/DataSourceImporterTest.java      |   1 +
 8 files changed, 190 insertions(+), 271 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index c9cb71f7f6a..956887c95f7 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,7 +33,7 @@ import java.util.List;
  */
 @Getter
 @Setter
-@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
+@EqualsAndHashCode(of = "tableName", callSuper = false)
 @ToString
 public final class DataRecord extends Record {
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
index d0ae80aaaec..4cb8b94939d 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
@@ -30,9 +30,11 @@ public final class GroupedDataRecord {
     
     private final String tableName;
     
-    private final List<DataRecord> insertDataRecords;
+    private final List<DataRecord> batchInsertDataRecords;
     
-    private final List<DataRecord> updateDataRecords;
+    private final List<DataRecord> batchUpdateDataRecords;
     
-    private final List<DataRecord> deleteDataRecords;
+    private final List<DataRecord> batchDeleteDataRecords;
+    
+    private final List<DataRecord> nonBatchRecords;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index b0f832722f7..48cf2130eea 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -30,4 +30,8 @@ public final class PipelineImporterJobWriteException extends PipelineSQLExceptio
     public PipelineImporterJobWriteException(final Exception cause) {
         super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.", cause);
     }
+    
+    public PipelineImporterJobWriteException(final String reason, final Exception cause) {
+        super(XOpenSQLState.GENERAL_ERROR, 91, reason, cause);
+    }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 9b7ba2663e4..42dd3f29152 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -17,20 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord.Key;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -38,37 +39,6 @@ import java.util.stream.Collectors;
  */
 public final class DataRecordMerger {
     
-    /**
-     * Merge data record.
-     * <pre>
-     * insert + insert -&gt; exception
-     * update + insert -&gt; exception
-     * delete + insert -&gt; insert
-     * insert + update -&gt; insert
-     * update + update -&gt; update
-     * delete + update -&gt; exception
-     * insert + delete -&gt; delete
-     * update + delete -&gt; delete
-     * delete + delete -&gt; exception
-     * </pre>
-     *
-     * @param dataRecords data records
-     * @return merged data records
-     */
-    public List<DataRecord> merge(final List<DataRecord> dataRecords) {
-        Map<DataRecord.Key, DataRecord> result = new HashMap<>();
-        dataRecords.forEach(each -> {
-            if (IngestDataChangeType.INSERT.equals(each.getType())) {
-                mergeInsert(each, result);
-            } else if (IngestDataChangeType.UPDATE.equals(each.getType())) {
-                mergeUpdate(each, result);
-            } else if (IngestDataChangeType.DELETE.equals(each.getType())) {
-                mergeDelete(each, result);
-            }
-        });
-        return new ArrayList<>(result.values());
-    }
-    
     /**
      * Group by table and type.
      *
@@ -76,90 +46,50 @@ public final class DataRecordMerger {
      * @return grouped data records
      */
     public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
-        List<GroupedDataRecord> result = new ArrayList<>(100);
-        List<DataRecord> mergedDataRecords = dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : merge(dataRecords);
-        Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
-        for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
-            Map<String, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
-            result.add(new GroupedDataRecord(entry.getKey(), typeGroup.get(IngestDataChangeType.INSERT), typeGroup.get(IngestDataChangeType.UPDATE), typeGroup.get(IngestDataChangeType.DELETE)));
-        }
-        return result;
-    }
-    
-    private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
-        ShardingSpherePreconditions.checkState(null == beforeDataRecord
-                || IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
-        dataRecords.put(dataRecord.getKey(), dataRecord);
-    }
-    
-    private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord) ? dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
-        if (null == beforeDataRecord) {
-            dataRecords.put(dataRecord.getKey(), dataRecord);
-            return;
-        }
-        ShardingSpherePreconditions.checkState(!IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new UnsupportedSQLOperationException("Not Delete"));
-        if (checkUpdatedPrimaryKey(dataRecord)) {
-            dataRecords.remove(dataRecord.getOldKey());
-        }
-        if (IngestDataChangeType.INSERT.equals(beforeDataRecord.getType())) {
-            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.INSERT);
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
-            return;
+        int insertCount = 0;
+        Map<Key, Boolean> duplicateKeyMap = new HashMap<>();
+        Set<String> tableNames = new LinkedHashSet<>();
+        for (DataRecord each : dataRecords) {
+            if (IngestDataChangeType.INSERT.equals(each.getType())) {
+                insertCount++;
+            }
+            tableNames.add(each.getTableName());
+            Key key = getKeyFromDataRecord(each);
+            if (duplicateKeyMap.containsKey(key)) {
+                duplicateKeyMap.put(key, true);
+                continue;
+            }
+            duplicateKeyMap.put(key, false);
         }
-        if (IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType())) {
-            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.UPDATE);
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+        List<GroupedDataRecord> result = new ArrayList<>(100);
+        if (insertCount == dataRecords.size()) {
+            Map<String, List<DataRecord>> tableGroup = dataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+            for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
+                result.add(new GroupedDataRecord(entry.getKey(), entry.getValue(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
+            }
+            return result;
         }
-    }
-    
-    private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
-        ShardingSpherePreconditions.checkState(null == beforeDataRecord
-                || !IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
-        if (null != beforeDataRecord && IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
-            DataRecord mergedDataRecord = new DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
-            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
-                mergedDataRecord.addColumn(new Column(dataRecord.getColumn(i).getName(),
-                        dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(), true, dataRecord.getColumn(i).isUniqueKey()));
+        Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
+        Map<String, Map<String, List<DataRecord>>> batchDataRecords = new LinkedHashMap<>();
+        for (DataRecord each : dataRecords) {
+            Key key = getKeyFromDataRecord(each);
+            if (duplicateKeyMap.getOrDefault(key, false)) {
+                nonBatchRecords.computeIfAbsent(each.getTableName(), ignored -> new LinkedList<>()).add(each);
+                continue;
             }
-            mergedDataRecord.setTableName(dataRecord.getTableName());
-            mergedDataRecord.setType(IngestDataChangeType.DELETE);
-            dataRecords.remove(beforeDataRecord.getKey());
-            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
-        } else {
-            dataRecords.put(dataRecord.getOldKey(), dataRecord);
+            Map<String, List<DataRecord>> recordMap = batchDataRecords.computeIfAbsent(each.getTableName(), ignored -> new HashMap<>());
+            recordMap.computeIfAbsent(each.getType(), ignored -> new LinkedList<>()).add(each);
         }
-    }
-    
-    private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
-        return RecordUtils.extractPrimaryColumns(dataRecord).stream().anyMatch(Column::isUpdated);
-    }
-    
-    private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord curDataRecord) {
-        DataRecord result = new DataRecord(curDataRecord.getPosition(), curDataRecord.getColumnCount());
-        for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
-            result.addColumn(new Column(
-                    curDataRecord.getColumn(i).getName(),
-                    preDataRecord.getColumn(i).isUniqueKey()
-                            ? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
-                            : null,
-                    curDataRecord.getColumn(i).getValue(),
-                    preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
-                    curDataRecord.getColumn(i).isUniqueKey()));
+        for (String each : tableNames) {
+            Map<String, List<DataRecord>> batchMap = batchDataRecords.getOrDefault(each, Collections.emptyMap());
+            List<DataRecord> nonBatchRecordMap = nonBatchRecords.getOrDefault(each, Collections.emptyList());
+            result.add(new GroupedDataRecord(each, batchMap.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
+                    batchMap.getOrDefault(IngestDataChangeType.UPDATE, Collections.emptyList()), batchMap.getOrDefault(IngestDataChangeType.DELETE, Collections.emptyList()), nonBatchRecordMap));
         }
         return result;
     }
     
-    private Object mergePrimaryKeyOldValue(final Column beforeColumn, final Column column) {
-        if (beforeColumn.isUpdated()) {
-            return beforeColumn.getOldValue();
-        }
-        return column.isUpdated() ? column.getOldValue() : null;
+    private Key getKeyFromDataRecord(final DataRecord dataRecord) {
+        return IngestDataChangeType.DELETE.equals(dataRecord.getType()) ? dataRecord.getOldKey() : dataRecord.getKey();
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 83692765b88..1ba4b49f1f5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -48,10 +48,13 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Default importer.
@@ -119,9 +122,10 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         }
         List<GroupedDataRecord> result = MERGER.group(dataRecords);
         for (GroupedDataRecord each : result) {
-            flushInternal(dataSource, each.getDeleteDataRecords());
-            flushInternal(dataSource, each.getInsertDataRecords());
-            flushInternal(dataSource, each.getUpdateDataRecords());
+            flushInternal(dataSource, each.getBatchDeleteDataRecords());
+            flushInternal(dataSource, each.getBatchInsertDataRecords());
+            flushInternal(dataSource, each.getBatchUpdateDataRecords());
+            sequentialFlush(dataSource, each.getNonBatchRecords());
         }
         return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
     }
@@ -130,15 +134,11 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         if (null == buffer || buffer.isEmpty()) {
             return;
         }
-        try {
-            tryFlush(dataSource, buffer);
-        } catch (final SQLException ex) {
-            throw new PipelineImporterJobWriteException(ex);
-        }
+        tryFlush(dataSource, buffer);
     }
     
     @SneakyThrows(InterruptedException.class)
-    private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
+    private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
         for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
             try {
                 doFlush(dataSource, buffer);
@@ -146,7 +146,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
             } catch (final SQLException ex) {
                 log.error("flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
                 if (i == importerConfig.getRetryTimes()) {
-                    throw ex;
+                    throw new PipelineImporterJobWriteException(ex);
                 }
                 Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
             }
@@ -182,6 +182,30 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         }
     }
     
+    private void doFlush(final Connection connection, final DataRecord each) throws SQLException {
+        switch (each.getType()) {
+            case IngestDataChangeType.INSERT:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+                }
+                executeBatchInsert(connection, Collections.singletonList(each));
+                break;
+            case IngestDataChangeType.UPDATE:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
+                }
+                executeUpdate(connection, each);
+                break;
+            case IngestDataChangeType.DELETE:
+                if (null != rateLimitAlgorithm) {
+                    rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
+                }
+                executeBatchDelete(connection, Collections.singletonList(each));
+                break;
+            default:
+        }
+    }
+    
     private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
         String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
@@ -243,16 +267,41 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
             for (DataRecord each : dataRecords) {
                 conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
-                    preparedStatement.setObject(i + 1, conditionColumns.get(i).getOldValue());
+                    Object oldValue = conditionColumns.get(i).getOldValue();
+                    if (null == oldValue) {
+                        log.warn("Record old value is null, record={}", each);
+                    }
+                    preparedStatement.setObject(i + 1, oldValue);
                 }
                 preparedStatement.addBatch();
             }
-            preparedStatement.executeBatch();
+            int[] counts = preparedStatement.executeBatch();
+            if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
+                log.warn("batchDelete failed, counts={}, sql={}", Arrays.toString(counts), deleteSQL);
+            }
         } finally {
             batchDeleteStatement = null;
         }
     }
     
+    private void sequentialFlush(final DataSource dataSource, final List<DataRecord> buffer) {
+        if (buffer.isEmpty()) {
+            return;
+        }
+        try (Connection connection = dataSource.getConnection()) {
+            // TODO it's better use transaction, but execute delete maybe not effect when open transaction of PostgreSQL sometimes
+            for (DataRecord each : buffer) {
+                try {
+                    doFlush(connection, each);
+                } catch (final SQLException ex) {
+                    throw new PipelineImporterJobWriteException(String.format("Write failed, record=%s", each), ex);
+                }
+            }
+        } catch (final SQLException ex) {
+            throw new PipelineImporterJobWriteException(ex);
+        }
+    }
+    
     @Override
     protected void doStop() throws SQLException {
         cancelStatement(batchInsertStatement);
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index fa8c7c5ca33..93f6cc62a77 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -21,213 +21,143 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPo
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 class DataRecordMergerTest {
     
     private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
     
-    private DataRecord beforeDataRecord;
-    
-    private DataRecord afterDataRecord;
-    
-    private Collection<DataRecord> actual;
-    
-    @Test
-    void assertInsertBeforeInsert() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
-    }
-    
-    @Test
-    void assertUpdateBeforeInsert() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
-    }
-    
     @Test
     void assertDeleteBeforeInsert() {
-        beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
-        afterDataRecord = mockInsertDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
+        DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertInsertBeforeUpdate() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertInsertBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeUpdate() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
-    }
-    
-    @Test
-    void assertUpdatePrimaryKeyBeforeUpdate() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(2, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
-        assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(2));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+        assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
-        assertThat(dataRecord.getColumn(0).getValue(), is(3));
-        assertThat(dataRecord.getColumn(1).getValue(), is(2));
-        assertThat(dataRecord.getColumn(2).getValue(), is(2));
-    }
-    
-    @Test
-    void assertDeleteBeforeUpdate() {
-        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
-        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
-        assertThrows(UnsupportedSQLOperationException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+        assertThat(actual.get(0).getBatchUpdateDataRecords().size(), is(2));
+        GroupedDataRecord actualGroupedDataRecord = actual.get(0);
+        DataRecord actualFirstDataRecord = actualGroupedDataRecord.getBatchUpdateDataRecords().get(0);
+        assertThat(actualFirstDataRecord.getType(), is(IngestDataChangeType.UPDATE));
+        assertThat(actualFirstDataRecord.getTableName(), is("order"));
+        assertThat(actualFirstDataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(actualFirstDataRecord.getColumn(0).getValue(), is(2));
+        assertThat(actualFirstDataRecord.getColumn(1).getValue(), is(1));
+        assertThat(actualFirstDataRecord.getColumn(2).getValue(), is(1));
+        DataRecord actualSecondDataRecord = actualGroupedDataRecord.getBatchUpdateDataRecords().get(1);
+        assertThat(actualSecondDataRecord.getType(), is(IngestDataChangeType.UPDATE));
+        assertThat(actualSecondDataRecord.getTableName(), is("order"));
+        assertThat(actualSecondDataRecord.getColumn(0).getOldValue(), is(2));
+        assertThat(actualSecondDataRecord.getColumn(0).getValue(), is(3));
+        assertThat(actualSecondDataRecord.getColumn(1).getValue(), is(2));
+        assertThat(actualSecondDataRecord.getColumn(2).getValue(), is(2));
     }
     
     @Test
     void assertInsertBeforeDelete() {
-        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdateBeforeDelete() {
-        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
     @Test
     void assertUpdatePrimaryKeyBeforeDelete() {
-        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(2, 1, 1);
-        actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+        DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+        List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
         assertThat(actual.size(), is(1));
-        DataRecord dataRecord = actual.iterator().next();
-        assertThat(dataRecord.getType(), is(IngestDataChangeType.DELETE));
-        assertThat(dataRecord.getTableName(), is("order"));
-        assertNull(dataRecord.getColumn(0).getOldValue());
-        assertThat(dataRecord.getColumn(0).getValue(), is(1));
-        assertThat(dataRecord.getColumn(1).getValue(), is(1));
-        assertThat(dataRecord.getColumn(2).getValue(), is(1));
+        assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
     }
     
-    @Test
-    void assertDeleteBeforeDelete() {
-        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
-        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
-        assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+    private void assertDataRecordsMatched(final List<DataRecord> actualRecords, final List<DataRecord> expectedRecords) {
+        for (int i = 0; i < actualRecords.size(); i++) {
+            assertThat(actualRecords.get(0), sameInstance(expectedRecords.get(0)));
+        }
     }
     
     @Test
     void assertGroup() {
-        List<DataRecord> dataRecords = mockDataRecords();
-        List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
-        assertThat(groupedDataRecords.size(), is(2));
-        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
-        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
-        assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), is(1));
-        assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), is(1));
-        assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), is(1));
-    }
-    
-    private List<DataRecord> mockDataRecords() {
-        return Arrays.asList(
+        List<DataRecord> dataRecords = Arrays.asList(
                 mockInsertDataRecord("t1", 1, 1, 1),
                 mockUpdateDataRecord("t1", 1, 2, 1),
                 mockUpdateDataRecord("t1", 1, 2, 2),
                 mockUpdateDataRecord("t1", 2, 1, 1),
                 mockUpdateDataRecord("t1", 2, 2, 1),
                 mockUpdateDataRecord("t1", 2, 2, 2),
+                mockInsertDataRecord("t1", 10, 10, 10),
                 mockDeleteDataRecord("t1", 3, 1, 1),
                 mockInsertDataRecord("t2", 1, 1, 1));
+        List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
+        assertThat(groupedDataRecords.size(), is(2));
+        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+        assertThat(groupedDataRecords.get(0).getBatchInsertDataRecords().size(), is(1));
+        assertThat(groupedDataRecords.get(0).getBatchUpdateDataRecords().size(), is(0));
+        assertThat(groupedDataRecords.get(0).getBatchDeleteDataRecords().size(), is(1));
+        assertThat(groupedDataRecords.get(0).getNonBatchRecords().size(), is(6));
+        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+        assertThat(groupedDataRecords.get(1).getBatchInsertDataRecords().size(), is(1));
     }
     
     private DataRecord mockInsertDataRecord(final int id, final int userId, final int totalPrice) {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 7298b116ef0..19b9e2b09e9 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -101,20 +101,22 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
         int randomInt = random.nextInt(-100, 100);
         if (databaseType instanceof MySQLDatabaseType) {
             String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "?");
-            log.info("update sql: {}", sql);
             int randomUnsignedInt = random.nextInt(10, 100);
             LocalDateTime now = LocalDateTime.now();
-            DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
+            Object[] parameters = {"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
                     randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new byte[]{}, new byte[]{1, 2, -1, -3},
-                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId});
+                    "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
             return;
         }
         if (databaseType instanceof SchemaSupportedDatabaseType) {
             String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
-            log.info("update sql: {}", sql);
-            DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
+            Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
                     PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
-                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId});
+                    LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId};
+            log.info("update sql: {}, params: {}", sql, parameters);
+            DataSourceExecuteUtils.execute(dataSource, sql, parameters);
         }
     }
     
@@ -124,6 +126,7 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
     
     private void deleteOrderById(final Object orderId) {
         String sql = SQLBuilderUtils.buildDeleteSQL(orderTableName, "order_id");
+        log.info("delete sql: {}, params: {}", sql, orderId);
         DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{orderId});
     }
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index d5466ba5ee3..dc0140be0d5 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -106,6 +106,7 @@ class DataSourceImporterTest {
         DataRecord deleteRecord = getDataRecord("DELETE");
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(deleteRecord));
+        when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);