You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/13 02:40:08 UTC

[shardingsphere] branch master updated: Merge and batch execute the rows change (#8147)

This is an automated email from the ASF dual-hosted git repository.

zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c168ae5  Merge and batch execute the rows change (#8147)
c168ae5 is described below

commit c168ae562264923e18f5e3507a3c038e3d488f6f
Author: avalon5666 <64...@users.noreply.github.com>
AuthorDate: Fri Nov 13 10:38:50 2020 +0800

    Merge and batch execute the rows change (#8147)
    
    * Fix jdbc dumper judge primary key error
    
    * Merge and batch execute the rows change (#7836)
---
 .../executor/dumper/AbstractJDBCDumper.java        |   2 +-
 .../executor/importer/AbstractJDBCImporter.java    | 106 ++++----
 .../executor/importer/DataRecordMerger.java        | 176 +++++++++++++
 .../importer/UnexpectedDataRecordOrder.java        |  29 +++
 .../core/execute/executor/record/DataRecord.java   |  31 +++
 .../execute/executor/record/GroupedDataRecord.java |  36 +++
 .../importer/AbstractJDBCImporterTest.java         |  15 +-
 .../executor/importer/DataRecordMergerTest.java    | 280 +++++++++++++++++++++
 .../execute/executor/record/DataRecordTest.java    |  57 +++++
 9 files changed, 682 insertions(+), 50 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index 3217946..c6df1c1 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -94,7 +94,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
                 record.setType(ScalingConstant.INSERT);
                 record.setTableName(inventoryDumperConfiguration.getTableNameMap().get(inventoryDumperConfiguration.getTableName()));
                 for (int i = 1; i <= metaData.getColumnCount(); i++) {
-                    record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i)));
+                    record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i - 1)));
                 }
                 pushRecord(record);
             }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index 3ca1ba7..8853501 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
 import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
 import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
@@ -36,9 +38,8 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.SQLIntegrityConstraintViolationException;
-import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Abstract JDBC importer implementation.
@@ -46,6 +47,8 @@ import java.util.List;
 @Slf4j
 public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
     
+    private static final DataRecordMerger MERGER = new DataRecordMerger();
+    
     private final ImporterConfiguration importerConfig;
     
     private final DataSourceManager dataSourceManager;
@@ -77,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     @Override
     public final void write() {
         while (isRunning()) {
-            List<Record> records = channel.fetchRecords(100, 3);
+            List<Record> records = channel.fetchRecords(1024, 3);
             if (null != records && !records.isEmpty()) {
                 flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfiguration()), records);
                 if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
@@ -90,65 +93,79 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
     }
     
     private void flush(final DataSource dataSource, final List<Record> buffer) {
+        List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream()
+                .filter(each -> each instanceof DataRecord)
+                .map(each -> (DataRecord) each)
+                .collect(Collectors.toList()));
+        groupedDataRecords.forEach(each -> {
+            if (CollectionUtils.isNotEmpty(each.getDeleteDataRecords())) {
+                flushInternal(dataSource, each.getDeleteDataRecords());
+            }
+            if (CollectionUtils.isNotEmpty(each.getInsertDataRecords())) {
+                flushInternal(dataSource, each.getInsertDataRecords());
+            }
+            if (CollectionUtils.isNotEmpty(each.getUpdateDataRecords())) {
+                flushInternal(dataSource, each.getUpdateDataRecords());
+            }
+        });
+    }
+    
+    private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
         boolean success = tryFlush(dataSource, buffer);
         if (isRunning() && !success) {
             throw new SyncTaskExecuteException("write failed.");
         }
     }
     
-    private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) {
+    private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
         int retryTimes = importerConfig.getRetryTimes();
-        List<Record> unflushed = buffer;
         do {
-            unflushed = doFlush(dataSource, unflushed);
-        } while (isRunning() && !unflushed.isEmpty() && retryTimes-- > 0);
-        return unflushed.isEmpty();
+            try {
+                doFlush(dataSource, buffer);
+                return true;
+            } catch (SQLException ex) {
+                log.error("flush failed: ", ex);
+            }
+        } while (isRunning() && retryTimes-- > 0);
+        return false;
     }
     
-    private List<Record> doFlush(final DataSource dataSource, final List<Record> buffer) {
-        int i = 0;
+    private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             connection.setAutoCommit(false);
-            for (; i < buffer.size(); i++) {
-                execute(connection, buffer.get(i));
-            }
-            connection.commit();
-        } catch (final SQLException ex) {
-            log.error("flush failed: {}", buffer.get(i), ex);
-            return buffer.subList(i, buffer.size());
-        }
-        return Collections.emptyList();
-    }
-    
-    private void execute(final Connection connection, final Record record) throws SQLException {
-        if (DataRecord.class.equals(record.getClass())) {
-            DataRecord dataRecord = (DataRecord) record;
-            switch (dataRecord.getType()) {
+            switch (buffer.get(0).getType()) {
                 case ScalingConstant.INSERT:
-                    executeInsert(connection, dataRecord);
+                    executeBatchInsert(connection, buffer);
                     break;
                 case ScalingConstant.UPDATE:
-                    executeUpdate(connection, dataRecord);
+                    executeUpdate(connection, buffer);
                     break;
                 case ScalingConstant.DELETE:
-                    executeDelete(connection, dataRecord);
+                    executeBatchDelete(connection, buffer);
                     break;
                 default:
                     break;
             }
+            connection.commit();
         }
     }
     
-    private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
-        String insertSql = sqlBuilder.buildInsertSQL(record);
+    private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+        String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
         PreparedStatement ps = connection.prepareStatement(insertSql);
         ps.setQueryTimeout(30);
-        try {
-            for (int i = 0; i < record.getColumnCount(); i++) {
-                ps.setObject(i + 1, record.getColumn(i).getValue());
+        for (DataRecord each : dataRecords) {
+            for (int i = 0; i < each.getColumnCount(); i++) {
+                ps.setObject(i + 1, each.getColumn(i).getValue());
             }
-            ps.execute();
-        } catch (final SQLIntegrityConstraintViolationException ignored) {
+            ps.addBatch();
+        }
+        ps.executeBatch();
+    }
+    
+    private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+        for (DataRecord each : dataRecords) {
+            executeUpdate(connection, each);
         }
     }
     
@@ -169,13 +186,18 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
         ps.execute();
     }
     
-    private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
-        List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
-        String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
-        PreparedStatement ps = connection.prepareStatement(deleteSql);
-        for (int i = 0; i < conditionColumns.size(); i++) {
-            ps.setObject(i + 1, conditionColumns.get(i).getValue());
+    private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+        List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecords.get(0), importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
+        String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
+        PreparedStatement ps = connection.prepareStatement(deleteSQL);
+        ps.setQueryTimeout(30);
+        for (DataRecord each : dataRecords) {
+            conditionColumns = RecordUtil.extractConditionColumns(each, importerConfig.getShardingColumnsMap().get(each.getTableName()));
+            for (int i = 0; i < conditionColumns.size(); i++) {
+                ps.setObject(i + 1, conditionColumns.get(i).getValue());
+            }
+            ps.addBatch();
         }
-        ps.execute();
+        ps.executeBatch();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java
new file mode 100644
index 0000000..a3d81a1
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Data Record merger.
+ */
+public class DataRecordMerger {
+    
+    /**
+     * Merge data record.
+     * insert + insert -> exception
+     * update + insert -> exception
+     * delete + insert -> insert
+     * insert + update -> insert
+     * update + update -> update
+     * delete + update -> exception
+     * insert + delete -> delete
+     * update + delete -> delete
+     * delete + delete -> exception
+     *
+     * @param dataRecords data records
+     * @return merged data records
+     */
+    public List<DataRecord> merge(final List<DataRecord> dataRecords) {
+        Map<DataRecord.Key, DataRecord> result = new HashMap<>();
+        dataRecords.forEach(dataRecord -> {
+            if (ScalingConstant.INSERT.equals(dataRecord.getType())) {
+                mergeInsert(dataRecord, result);
+            } else if (ScalingConstant.UPDATE.equals(dataRecord.getType())) {
+                mergeUpdate(dataRecord, result);
+            } else if (ScalingConstant.DELETE.equals(dataRecord.getType())) {
+                mergeDelete(dataRecord, result);
+            }
+        });
+        return new ArrayList<>(result.values());
+    }
+    
+    /**
+     * Group by table and type.
+     *
+     * @param dataRecords data records
+     * @return grouped data records
+     */
+    public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+        List<DataRecord> mergedDataRecords = merge(dataRecords);
+        List<GroupedDataRecord> result = new ArrayList<>(100);
+        Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+        for (Map.Entry<String, List<DataRecord>> each : tableGroup.entrySet()) {
+            Map<String, List<DataRecord>> typeGroup = each.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
+            result.add(new GroupedDataRecord(each.getKey(),
+                    typeGroup.get(ScalingConstant.INSERT),
+                    typeGroup.get(ScalingConstant.UPDATE),
+                    typeGroup.get(ScalingConstant.DELETE)));
+        }
+        return result;
+    }
+    
+    private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+        if (null != beforeDataRecord && !ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
+            throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
+        }
+        dataRecords.put(dataRecord.getKey(), dataRecord);
+    }
+    
+    private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord)
+                ? dataRecords.get(dataRecord.getOldKey())
+                : dataRecords.get(dataRecord.getKey());
+        if (null == beforeDataRecord) {
+            dataRecords.put(dataRecord.getKey(), dataRecord);
+            return;
+        }
+        if (ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
+            throw new UnsupportedOperationException();
+        }
+        if (checkUpdatedPrimaryKey(dataRecord) && dataRecords.containsKey(dataRecord.getOldKey())) {
+            dataRecords.remove(dataRecord.getOldKey());
+        }
+        if (ScalingConstant.INSERT.equals(beforeDataRecord.getType())) {
+            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
+            mergedDataRecord.setTableName(dataRecord.getTableName());
+            mergedDataRecord.setType(ScalingConstant.INSERT);
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+            return;
+        }
+        if (ScalingConstant.UPDATE.equals(beforeDataRecord.getType())) {
+            DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
+            mergedDataRecord.setTableName(dataRecord.getTableName());
+            mergedDataRecord.setType(ScalingConstant.UPDATE);
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+            return;
+        }
+    }
+    
+    private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+        if (null != beforeDataRecord && (ScalingConstant.DELETE.equals(beforeDataRecord.getType()))) {
+            throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
+        }
+        if (null != beforeDataRecord && ScalingConstant.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
+            // primary key updated + delete
+            DataRecord mergedDataRecord = new DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
+            for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+                mergedDataRecord.addColumn(new Column(
+                        dataRecord.getColumn(i).getName(),
+                        dataRecord.getColumn(i).isPrimaryKey()
+                                ? beforeDataRecord.getColumn(i).getOldValue()
+                                : beforeDataRecord.getColumn(i).getValue(),
+                        true,
+                        dataRecord.getColumn(i).isPrimaryKey()
+                ));
+            }
+            mergedDataRecord.setTableName(dataRecord.getTableName());
+            mergedDataRecord.setType(ScalingConstant.DELETE);
+            dataRecords.remove(beforeDataRecord.getKey());
+            dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+        } else {
+            dataRecords.put(dataRecord.getKey(), dataRecord);
+        }
+    }
+    
+    private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
+        return RecordUtil.extractPrimaryColumns(dataRecord).stream().anyMatch(each -> each.isUpdated());
+    }
+    
+    private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord curDataRecord) {
+        DataRecord result = new DataRecord(curDataRecord.getPosition(), curDataRecord.getColumnCount());
+        for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
+            result.addColumn(new Column(
+                    curDataRecord.getColumn(i).getName(),
+                    preDataRecord.getColumn(i).isPrimaryKey()
+                            ? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
+                            : null,
+                    curDataRecord.getColumn(i).getValue(),
+                    preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
+                    curDataRecord.getColumn(i).isPrimaryKey()
+            ));
+        }
+        return result;
+    }
+    
+    private Object mergePrimaryKeyOldValue(final Column beforeColumn, final Column column) {
+        return beforeColumn.isUpdated()
+                ? beforeColumn.getOldValue()
+                : (column.isUpdated() ? column.getOldValue() : null);
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java
new file mode 100644
index 0000000..bad98c2
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+
+@RequiredArgsConstructor
+public class UnexpectedDataRecordOrder extends RuntimeException {
+    
+    private final DataRecord beforeDataRecord;
+    
+    private final DataRecord afterDataRecord;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index e8594c0..c92ffcc 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.execute.executor.record;
 
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import org.apache.shardingsphere.scaling.core.job.position.Position;
 
@@ -41,6 +42,8 @@ public final class DataRecord extends Record {
     
     private final List<Object> primaryKeyValue = new LinkedList<>();
     
+    private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
+    
     private String type;
     
     private String tableName;
@@ -59,6 +62,7 @@ public final class DataRecord extends Record {
         columns.add(data);
         if (data.isPrimaryKey()) {
             primaryKeyValue.add(data.getValue());
+            oldPrimaryKeyValues.add(data.getOldValue());
         }
     }
     
@@ -80,4 +84,31 @@ public final class DataRecord extends Record {
     public Column getColumn(final int index) {
         return columns.get(index);
     }
+    
+    /**
+     * Get key.
+     *
+     * @return key
+     */
+    public Key getKey() {
+        return new Key(tableName, primaryKeyValue);
+    }
+    
+    /**
+     * Get old key.
+     *
+     * @return key
+     */
+    public Key getOldKey() {
+        return new Key(tableName, oldPrimaryKeyValues);
+    }
+    
+    @EqualsAndHashCode
+    @RequiredArgsConstructor
+    public static class Key {
+        
+        private final String tableName;
+        
+        private final List<Object> primaryKeyValues;
+    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java
new file mode 100644
index 0000000..f63d179
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.record;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.List;
+
+@Getter
+@RequiredArgsConstructor
+public class GroupedDataRecord {
+    
+    private final String tableName;
+    
+    private final List<DataRecord> insertDataRecords;
+    
+    private final List<DataRecord> updateDataRecords;
+    
+    private final List<DataRecord> deleteDataRecords;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 637131d..544c481 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
 import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
 import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
 import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -103,12 +104,12 @@ public final class AbstractJDBCImporterTest {
         DataRecord insertRecord = getDataRecord("INSERT");
         when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
         when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
-        when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(insertRecord));
+        when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(insertRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
         verify(preparedStatement).setObject(3, "INSERT");
-        verify(preparedStatement).execute();
+        verify(preparedStatement).addBatch();
     }
     
     @Test
@@ -116,11 +117,11 @@ public final class AbstractJDBCImporterTest {
         DataRecord deleteRecord = getDataRecord("DELETE");
         when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
         when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
-        when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
+        when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(deleteRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 1);
         verify(preparedStatement).setObject(2, 10);
-        verify(preparedStatement).execute();
+        verify(preparedStatement).addBatch();
     }
     
     @Test
@@ -128,7 +129,7 @@ public final class AbstractJDBCImporterTest {
         DataRecord updateRecord = getDataRecord("UPDATE");
         when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
-        when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+        when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
         verify(preparedStatement).setObject(1, 10);
         verify(preparedStatement).setObject(2, "UPDATE");
@@ -142,7 +143,7 @@ public final class AbstractJDBCImporterTest {
         DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
         when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
         when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
-        when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+        when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
         InOrder inOrder = inOrder(preparedStatement);
         inOrder.verify(preparedStatement).setObject(1, 2);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java
new file mode 100644
index 0000000..f33eff3
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+public class DataRecordMergerTest {
+    
+    private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
+    
+    private DataRecord beforeDataRecord;
+    
+    private DataRecord afterDataRecord;
+    
+    private Collection<DataRecord> actual;
+    
+    @Test(expected = UnexpectedDataRecordOrder.class)
+    public void assertInsertBeforeInsert() {
+        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+    }
+    
+    @Test(expected = UnexpectedDataRecordOrder.class)
+    public void assertUpdateBeforeInsert() {
+        beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
+        afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+    }
+    
+    @Test
+    public void assertDeleteBeforeInsert() {
+        beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
+        afterDataRecord = mockInsertDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+    }
+    
+    @Test
+    public void assertInsertBeforeUpdate() {
+        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    public void assertInsertBeforeUpdatePrimaryKey() {
+        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    public void assertUpdateBeforeUpdate() {
+        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    public void assertUpdateBeforeUpdatePrimaryKey() {
+        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    public void assertUpdatePrimaryKeyBeforeUpdate() {
+        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(2));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test
+    public void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
+        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+        assertThat(dataRecord.getColumn(0).getValue(), is(3));
+        assertThat(dataRecord.getColumn(1).getValue(), is(2));
+        assertThat(dataRecord.getColumn(2).getValue(), is(2));
+    }
+    
+    @Test(expected = UnsupportedOperationException.class)
+    public void assertDeleteBeforeUpdate() {
+        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+        afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+    }
+    
+    @Test
+    public void assertInsertBeforeDelete() {
+        beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+    }
+    
+    @Test
+    public void assertUpdateBeforeDelete() {
+        beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+    }
+    
+    @Test
+    public void assertUpdatePrimaryKeyBeforeDelete() {
+        beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+        afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+        assertThat(actual.size(), is(1));
+        DataRecord dataRecord = actual.iterator().next();
+        assertThat(dataRecord.getType(), is(ScalingConstant.DELETE));
+        assertThat(dataRecord.getTableName(), is("order"));
+        assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+        assertThat(dataRecord.getColumn(0).getValue(), is(1));
+        assertThat(dataRecord.getColumn(1).getValue(), is(1));
+        assertThat(dataRecord.getColumn(2).getValue(), is(1));
+    }
+    
+    @Test(expected = UnexpectedDataRecordOrder.class)
+    public void assertDeleteBeforeDelete() {
+        beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+        afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+        actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+    }
+    
+    @Test
+    public void assertGroup() {
+        List<DataRecord> dataRecords = mockDataRecords();
+        List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
+        assertThat(groupedDataRecords.size(), is(2));
+        assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+        assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+        assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), is(1));
+        assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), is(1));
+        assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), is(1));
+    }
+    
+    private List<DataRecord> mockDataRecords() {
+        return Lists.newArrayList(
+                mockInsertDataRecord("t1", 1, 1, 1),
+                mockUpdateDataRecord("t1", 1, 2, 1),
+                mockUpdateDataRecord("t1", 1, 2, 2),
+                mockUpdateDataRecord("t1", 2, 1, 1),
+                mockUpdateDataRecord("t1", 2, 2, 1),
+                mockUpdateDataRecord("t1", 2, 2, 2),
+                mockDeleteDataRecord("t1", 3, 1, 1),
+                mockInsertDataRecord("t2", 1, 1, 1)
+        );
+    }
+    
+    private DataRecord mockInsertDataRecord(final int id, final int userId, final int totalPrice) {
+        return mockInsertDataRecord("order", id, userId, totalPrice);
+    }
+    
+    private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+        DataRecord result = new DataRecord(new NopPosition(), 3);
+        result.setType(ScalingConstant.INSERT);
+        result.setTableName(tableName);
+        result.addColumn(new Column("id", id, true, true));
+        result.addColumn(new Column("user_id", userId, true, false));
+        result.addColumn(new Column("total_price", totalPrice, true, false));
+        return result;
+    }
+    
+    private DataRecord mockUpdateDataRecord(final int id, final int userId, final int totalPrice) {
+        return mockUpdateDataRecord("order", null, id, userId, totalPrice);
+    }
+    
+    private DataRecord mockUpdateDataRecord(final Integer oldId, final int id, final int userId, final int totalPrice) {
+        return mockUpdateDataRecord("order", oldId, id, userId, totalPrice);
+    }
+    
+    private DataRecord mockUpdateDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+        return mockUpdateDataRecord(tableName, null, id, userId, totalPrice);
+    }
+    
+    private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) {
+        DataRecord result = new DataRecord(new NopPosition(), 3);
+        result.setType(ScalingConstant.UPDATE);
+        result.setTableName(tableName);
+        result.addColumn(new Column("id", oldId, id, null != oldId, true));
+        result.addColumn(new Column("user_id", userId, true, false));
+        result.addColumn(new Column("total_price", totalPrice, true, false));
+        return result;
+    }
+    
+    private DataRecord mockDeleteDataRecord(final int id, final int userId, final int totalPrice) {
+        return mockDeleteDataRecord("order", id, userId, totalPrice);
+    }
+    
+    private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+        DataRecord preDataRecord = new DataRecord(new NopPosition(), 3);
+        preDataRecord.setType(ScalingConstant.DELETE);
+        preDataRecord.setTableName(tableName);
+        preDataRecord.addColumn(new Column("id", id, true, true));
+        preDataRecord.addColumn(new Column("user_id", userId, true, false));
+        preDataRecord.addColumn(new Column("total_price", totalPrice, true, false));
+        return preDataRecord;
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java
new file mode 100644
index 0000000..0e7eba6
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.record;
+
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class DataRecordTest {
+    
+    private DataRecord beforeDataRecord;
+    
+    private DataRecord afterDataRecord;
+    
+    @Test
+    public void assertKeyEqual() {
+        beforeDataRecord = new DataRecord(new NopPosition(), 2);
+        beforeDataRecord.setTableName("t1");
+        beforeDataRecord.addColumn(new Column("id", 1, true, true));
+        beforeDataRecord.addColumn(new Column("name", "1", true, false));
+        afterDataRecord = new DataRecord(new NopPosition(), 2);
+        afterDataRecord.setTableName("t1");
+        afterDataRecord.addColumn(new Column("id", 1, true, true));
+        afterDataRecord.addColumn(new Column("name", "2", true, false));
+        assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
+    }
+    
+    @Test
+    public void assertOldKeyEqual() {
+        beforeDataRecord = new DataRecord(new NopPosition(), 2);
+        beforeDataRecord.setTableName("t1");
+        beforeDataRecord.addColumn(new Column("id", 1, true, true));
+        beforeDataRecord.addColumn(new Column("name", "1", true, false));
+        afterDataRecord = new DataRecord(new NopPosition(), 2);
+        afterDataRecord.setTableName("t1");
+        afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
+        afterDataRecord.addColumn(new Column("name", "2", true, false));
+        assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
+    }
+}