You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/11/13 02:40:08 UTC
[shardingsphere] branch master updated: Merge and batch execute the
rows change (#8147)
This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c168ae5 Merge and batch execute the rows change (#8147)
c168ae5 is described below
commit c168ae562264923e18f5e3507a3c038e3d488f6f
Author: avalon5666 <64...@users.noreply.github.com>
AuthorDate: Fri Nov 13 10:38:50 2020 +0800
Merge and batch execute the rows change (#8147)
* Fix jdbc dumper judge primary key error
* Merge and batch execute the rows change (#7836)
---
.../executor/dumper/AbstractJDBCDumper.java | 2 +-
.../executor/importer/AbstractJDBCImporter.java | 106 ++++----
.../executor/importer/DataRecordMerger.java | 176 +++++++++++++
.../importer/UnexpectedDataRecordOrder.java | 29 +++
.../core/execute/executor/record/DataRecord.java | 31 +++
.../execute/executor/record/GroupedDataRecord.java | 36 +++
.../importer/AbstractJDBCImporterTest.java | 15 +-
.../executor/importer/DataRecordMergerTest.java | 280 +++++++++++++++++++++
.../execute/executor/record/DataRecordTest.java | 57 +++++
9 files changed, 682 insertions(+), 50 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
index 3217946..c6df1c1 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/dumper/AbstractJDBCDumper.java
@@ -94,7 +94,7 @@ public abstract class AbstractJDBCDumper extends AbstractShardingScalingExecutor
record.setType(ScalingConstant.INSERT);
record.setTableName(inventoryDumperConfiguration.getTableNameMap().get(inventoryDumperConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
- record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i)));
+ record.addColumn(new Column(metaData.getColumnName(i), readValue(rs, i), true, tableMetaData.isPrimaryKey(i - 1)));
}
pushRecord(record);
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index 3ca1ba7..8853501 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
@@ -28,6 +29,7 @@ import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
@@ -36,9 +38,8 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.sql.SQLIntegrityConstraintViolationException;
-import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Abstract JDBC importer implementation.
@@ -46,6 +47,8 @@ import java.util.List;
@Slf4j
public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecutor<IncrementalPosition> implements Importer {
+ private static final DataRecordMerger MERGER = new DataRecordMerger();
+
private final ImporterConfiguration importerConfig;
private final DataSourceManager dataSourceManager;
@@ -77,7 +80,7 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
@Override
public final void write() {
while (isRunning()) {
- List<Record> records = channel.fetchRecords(100, 3);
+ List<Record> records = channel.fetchRecords(1024, 3);
if (null != records && !records.isEmpty()) {
flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfiguration()), records);
if (FinishedRecord.class.equals(records.get(records.size() - 1).getClass())) {
@@ -90,65 +93,79 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
}
private void flush(final DataSource dataSource, final List<Record> buffer) {
+ List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream()
+ .filter(each -> each instanceof DataRecord)
+ .map(each -> (DataRecord) each)
+ .collect(Collectors.toList()));
+ groupedDataRecords.forEach(each -> {
+ if (CollectionUtils.isNotEmpty(each.getDeleteDataRecords())) {
+ flushInternal(dataSource, each.getDeleteDataRecords());
+ }
+ if (CollectionUtils.isNotEmpty(each.getInsertDataRecords())) {
+ flushInternal(dataSource, each.getInsertDataRecords());
+ }
+ if (CollectionUtils.isNotEmpty(each.getUpdateDataRecords())) {
+ flushInternal(dataSource, each.getUpdateDataRecords());
+ }
+ });
+ }
+
+ private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
boolean success = tryFlush(dataSource, buffer);
if (isRunning() && !success) {
throw new SyncTaskExecuteException("write failed.");
}
}
- private boolean tryFlush(final DataSource dataSource, final List<Record> buffer) {
+ private boolean tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
int retryTimes = importerConfig.getRetryTimes();
- List<Record> unflushed = buffer;
do {
- unflushed = doFlush(dataSource, unflushed);
- } while (isRunning() && !unflushed.isEmpty() && retryTimes-- > 0);
- return unflushed.isEmpty();
+ try {
+ doFlush(dataSource, buffer);
+ return true;
+ } catch (SQLException ex) {
+ log.error("flush failed: ", ex);
+ }
+ } while (isRunning() && retryTimes-- > 0);
+ return false;
}
- private List<Record> doFlush(final DataSource dataSource, final List<Record> buffer) {
- int i = 0;
+ private void doFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
- for (; i < buffer.size(); i++) {
- execute(connection, buffer.get(i));
- }
- connection.commit();
- } catch (final SQLException ex) {
- log.error("flush failed: {}", buffer.get(i), ex);
- return buffer.subList(i, buffer.size());
- }
- return Collections.emptyList();
- }
-
- private void execute(final Connection connection, final Record record) throws SQLException {
- if (DataRecord.class.equals(record.getClass())) {
- DataRecord dataRecord = (DataRecord) record;
- switch (dataRecord.getType()) {
+ switch (buffer.get(0).getType()) {
case ScalingConstant.INSERT:
- executeInsert(connection, dataRecord);
+ executeBatchInsert(connection, buffer);
break;
case ScalingConstant.UPDATE:
- executeUpdate(connection, dataRecord);
+ executeUpdate(connection, buffer);
break;
case ScalingConstant.DELETE:
- executeDelete(connection, dataRecord);
+ executeBatchDelete(connection, buffer);
break;
default:
break;
}
+ connection.commit();
}
}
- private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
- String insertSql = sqlBuilder.buildInsertSQL(record);
+ private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+ String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
- try {
- for (int i = 0; i < record.getColumnCount(); i++) {
- ps.setObject(i + 1, record.getColumn(i).getValue());
+ for (DataRecord each : dataRecords) {
+ for (int i = 0; i < each.getColumnCount(); i++) {
+ ps.setObject(i + 1, each.getColumn(i).getValue());
}
- ps.execute();
- } catch (final SQLIntegrityConstraintViolationException ignored) {
+ ps.addBatch();
+ }
+ ps.executeBatch();
+ }
+
+ private void executeUpdate(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+ for (DataRecord each : dataRecords) {
+ executeUpdate(connection, each);
}
}
@@ -169,13 +186,18 @@ public abstract class AbstractJDBCImporter extends AbstractShardingScalingExecut
ps.execute();
}
- private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
- List<Column> conditionColumns = RecordUtil.extractConditionColumns(record, importerConfig.getShardingColumnsMap().get(record.getTableName()));
- String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
- PreparedStatement ps = connection.prepareStatement(deleteSql);
- for (int i = 0; i < conditionColumns.size(); i++) {
- ps.setObject(i + 1, conditionColumns.get(i).getValue());
+ private void executeBatchDelete(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
+ List<Column> conditionColumns = RecordUtil.extractConditionColumns(dataRecords.get(0), importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
+ String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
+ PreparedStatement ps = connection.prepareStatement(deleteSQL);
+ ps.setQueryTimeout(30);
+ for (DataRecord each : dataRecords) {
+ conditionColumns = RecordUtil.extractConditionColumns(each, importerConfig.getShardingColumnsMap().get(each.getTableName()));
+ for (int i = 0; i < conditionColumns.size(); i++) {
+ ps.setObject(i + 1, conditionColumns.get(i).getValue());
+ }
+ ps.addBatch();
}
- ps.execute();
+ ps.executeBatch();
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java
new file mode 100644
index 0000000..a3d81a1
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMerger.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Data Record merger.
+ */
+public class DataRecordMerger {
+
+ /**
+ * Merge data record.
+ * insert + insert -> exception
+ * update + insert -> exception
+ * delete + insert -> insert
+ * insert + update -> insert
+ * update + update -> update
+ * delete + update -> exception
+ * insert + delete -> delete
+ * update + delete -> delete
+ * delete + delete -> exception
+ *
+ * @param dataRecords data records
+ * @return merged data records
+ */
+ public List<DataRecord> merge(final List<DataRecord> dataRecords) {
+ Map<DataRecord.Key, DataRecord> result = new HashMap<>();
+ dataRecords.forEach(dataRecord -> {
+ if (ScalingConstant.INSERT.equals(dataRecord.getType())) {
+ mergeInsert(dataRecord, result);
+ } else if (ScalingConstant.UPDATE.equals(dataRecord.getType())) {
+ mergeUpdate(dataRecord, result);
+ } else if (ScalingConstant.DELETE.equals(dataRecord.getType())) {
+ mergeDelete(dataRecord, result);
+ }
+ });
+ return new ArrayList<>(result.values());
+ }
+
+ /**
+ * Group by table and type.
+ *
+ * @param dataRecords data records
+ * @return grouped data records
+ */
+ public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
+ List<DataRecord> mergedDataRecords = merge(dataRecords);
+ List<GroupedDataRecord> result = new ArrayList<>(100);
+ Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+ for (Map.Entry<String, List<DataRecord>> each : tableGroup.entrySet()) {
+ Map<String, List<DataRecord>> typeGroup = each.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
+ result.add(new GroupedDataRecord(each.getKey(),
+ typeGroup.get(ScalingConstant.INSERT),
+ typeGroup.get(ScalingConstant.UPDATE),
+ typeGroup.get(ScalingConstant.DELETE)));
+ }
+ return result;
+ }
+
+ private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+ if (null != beforeDataRecord && !ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
+ throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
+ }
+ dataRecords.put(dataRecord.getKey(), dataRecord);
+ }
+
+ private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord)
+ ? dataRecords.get(dataRecord.getOldKey())
+ : dataRecords.get(dataRecord.getKey());
+ if (null == beforeDataRecord) {
+ dataRecords.put(dataRecord.getKey(), dataRecord);
+ return;
+ }
+ if (ScalingConstant.DELETE.equals(beforeDataRecord.getType())) {
+ throw new UnsupportedOperationException();
+ }
+ if (checkUpdatedPrimaryKey(dataRecord) && dataRecords.containsKey(dataRecord.getOldKey())) {
+ dataRecords.remove(dataRecord.getOldKey());
+ }
+ if (ScalingConstant.INSERT.equals(beforeDataRecord.getType())) {
+ DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
+ mergedDataRecord.setTableName(dataRecord.getTableName());
+ mergedDataRecord.setType(ScalingConstant.INSERT);
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ return;
+ }
+ if (ScalingConstant.UPDATE.equals(beforeDataRecord.getType())) {
+ DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
+ mergedDataRecord.setTableName(dataRecord.getTableName());
+ mergedDataRecord.setType(ScalingConstant.UPDATE);
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ return;
+ }
+ }
+
+ private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+ if (null != beforeDataRecord && (ScalingConstant.DELETE.equals(beforeDataRecord.getType()))) {
+ throw new UnexpectedDataRecordOrder(beforeDataRecord, dataRecord);
+ }
+ if (null != beforeDataRecord && ScalingConstant.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
+ // primary key updated + delete
+ DataRecord mergedDataRecord = new DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
+ for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+ mergedDataRecord.addColumn(new Column(
+ dataRecord.getColumn(i).getName(),
+ dataRecord.getColumn(i).isPrimaryKey()
+ ? beforeDataRecord.getColumn(i).getOldValue()
+ : beforeDataRecord.getColumn(i).getValue(),
+ true,
+ dataRecord.getColumn(i).isPrimaryKey()
+ ));
+ }
+ mergedDataRecord.setTableName(dataRecord.getTableName());
+ mergedDataRecord.setType(ScalingConstant.DELETE);
+ dataRecords.remove(beforeDataRecord.getKey());
+ dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ } else {
+ dataRecords.put(dataRecord.getKey(), dataRecord);
+ }
+ }
+
+ private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
+ return RecordUtil.extractPrimaryColumns(dataRecord).stream().anyMatch(each -> each.isUpdated());
+ }
+
+ private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord curDataRecord) {
+ DataRecord result = new DataRecord(curDataRecord.getPosition(), curDataRecord.getColumnCount());
+ for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
+ result.addColumn(new Column(
+ curDataRecord.getColumn(i).getName(),
+ preDataRecord.getColumn(i).isPrimaryKey()
+ ? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
+ : null,
+ curDataRecord.getColumn(i).getValue(),
+ preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
+ curDataRecord.getColumn(i).isPrimaryKey()
+ ));
+ }
+ return result;
+ }
+
+ private Object mergePrimaryKeyOldValue(final Column beforeColumn, final Column column) {
+ return beforeColumn.isUpdated()
+ ? beforeColumn.getOldValue()
+ : (column.isUpdated() ? column.getOldValue() : null);
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java
new file mode 100644
index 0000000..bad98c2
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/UnexpectedDataRecordOrder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+
+@RequiredArgsConstructor
+public class UnexpectedDataRecordOrder extends RuntimeException {
+
+ private final DataRecord beforeDataRecord;
+
+ private final DataRecord afterDataRecord;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index e8594c0..c92ffcc 100755
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.execute.executor.record;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.scaling.core.job.position.Position;
@@ -41,6 +42,8 @@ public final class DataRecord extends Record {
private final List<Object> primaryKeyValue = new LinkedList<>();
+ private final List<Object> oldPrimaryKeyValues = new ArrayList<>();
+
private String type;
private String tableName;
@@ -59,6 +62,7 @@ public final class DataRecord extends Record {
columns.add(data);
if (data.isPrimaryKey()) {
primaryKeyValue.add(data.getValue());
+ oldPrimaryKeyValues.add(data.getOldValue());
}
}
@@ -80,4 +84,31 @@ public final class DataRecord extends Record {
public Column getColumn(final int index) {
return columns.get(index);
}
+
+ /**
+ * Get key.
+ *
+ * @return key
+ */
+ public Key getKey() {
+ return new Key(tableName, primaryKeyValue);
+ }
+
+ /**
+ * Get old key.
+ *
+ * @return key
+ */
+ public Key getOldKey() {
+ return new Key(tableName, oldPrimaryKeyValues);
+ }
+
+ @EqualsAndHashCode
+ @RequiredArgsConstructor
+ public static class Key {
+
+ private final String tableName;
+
+ private final List<Object> primaryKeyValues;
+ }
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java
new file mode 100644
index 0000000..f63d179
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/GroupedDataRecord.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.record;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.List;
+
+@Getter
+@RequiredArgsConstructor
+public class GroupedDataRecord {
+
+ private final String tableName;
+
+ private final List<DataRecord> insertDataRecords;
+
+ private final List<DataRecord> updateDataRecords;
+
+ private final List<DataRecord> deleteDataRecords;
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 637131d..544c481 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -103,12 +104,12 @@ public final class AbstractJDBCImporterTest {
DataRecord insertRecord = getDataRecord("INSERT");
when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
- when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(insertRecord));
+ when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
verify(preparedStatement).setObject(3, "INSERT");
- verify(preparedStatement).execute();
+ verify(preparedStatement).addBatch();
}
@Test
@@ -116,11 +117,11 @@ public final class AbstractJDBCImporterTest {
DataRecord deleteRecord = getDataRecord("DELETE");
when(sqlBuilder.buildDeleteSQL(deleteRecord, mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
- when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(deleteRecord));
+ when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);
- verify(preparedStatement).execute();
+ verify(preparedStatement).addBatch();
}
@Test
@@ -128,7 +129,7 @@ public final class AbstractJDBCImporterTest {
DataRecord updateRecord = getDataRecord("UPDATE");
when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
- when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+ when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
verify(preparedStatement).setObject(1, 10);
verify(preparedStatement).setObject(2, "UPDATE");
@@ -142,7 +143,7 @@ public final class AbstractJDBCImporterTest {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
when(sqlBuilder.buildUpdateSQL(updateRecord, mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
- when(channel.fetchRecords(100, 3)).thenReturn(mockRecords(updateRecord));
+ when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
InOrder inOrder = inOrder(preparedStatement);
inOrder.verify(preparedStatement).setObject(1, 2);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java
new file mode 100644
index 0000000..f33eff3
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/DataRecordMergerTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+
+import com.google.common.collect.Lists;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+public class DataRecordMergerTest {
+
+ private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
+
+ private DataRecord beforeDataRecord;
+
+ private DataRecord afterDataRecord;
+
+ private Collection<DataRecord> actual;
+
+ @Test(expected = UnexpectedDataRecordOrder.class)
+ public void assertInsertBeforeInsert() {
+ beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ }
+
+ @Test(expected = UnexpectedDataRecordOrder.class)
+ public void assertUpdateBeforeInsert() {
+ beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
+ afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ }
+
+ @Test
+ public void assertDeleteBeforeInsert() {
+ beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
+ afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ }
+
+ @Test
+ public void assertInsertBeforeUpdate() {
+ beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ public void assertInsertBeforeUpdatePrimaryKey() {
+ beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.INSERT));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ public void assertUpdateBeforeUpdate() {
+ beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ public void assertUpdateBeforeUpdatePrimaryKey() {
+ beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ public void assertUpdatePrimaryKeyBeforeUpdate() {
+ beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(2, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(2));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test
+ public void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
+ beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.UPDATE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(dataRecord.getColumn(0).getValue(), is(3));
+ assertThat(dataRecord.getColumn(1).getValue(), is(2));
+ assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void assertDeleteBeforeUpdate() {
+ beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+ afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ }
+
+ @Test
+ public void assertInsertBeforeDelete() {
+ beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ }
+
+ @Test
+ public void assertUpdateBeforeDelete() {
+ beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ }
+
+ @Test
+ public void assertUpdatePrimaryKeyBeforeDelete() {
+ beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ assertThat(actual.size(), is(1));
+ DataRecord dataRecord = actual.iterator().next();
+ assertThat(dataRecord.getType(), is(ScalingConstant.DELETE));
+ assertThat(dataRecord.getTableName(), is("order"));
+ assertThat(dataRecord.getColumn(0).getOldValue(), nullValue());
+ assertThat(dataRecord.getColumn(0).getValue(), is(1));
+ assertThat(dataRecord.getColumn(1).getValue(), is(1));
+ assertThat(dataRecord.getColumn(2).getValue(), is(1));
+ }
+
+ @Test(expected = UnexpectedDataRecordOrder.class)
+ public void assertDeleteBeforeDelete() {
+ beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
+ afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ actual = dataRecordMerger.merge(Lists.newArrayList(beforeDataRecord, afterDataRecord));
+ }
+
+ @Test
+ public void assertGroup() {
+ List<DataRecord> dataRecords = mockDataRecords();
+ List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
+ assertThat(groupedDataRecords.size(), is(2));
+ assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+ assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+ assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), is(1));
+ assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), is(1));
+ assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), is(1));
+ }
+
+ private List<DataRecord> mockDataRecords() {
+ return Lists.newArrayList(
+ mockInsertDataRecord("t1", 1, 1, 1),
+ mockUpdateDataRecord("t1", 1, 2, 1),
+ mockUpdateDataRecord("t1", 1, 2, 2),
+ mockUpdateDataRecord("t1", 2, 1, 1),
+ mockUpdateDataRecord("t1", 2, 2, 1),
+ mockUpdateDataRecord("t1", 2, 2, 2),
+ mockDeleteDataRecord("t1", 3, 1, 1),
+ mockInsertDataRecord("t2", 1, 1, 1)
+ );
+ }
+
+ private DataRecord mockInsertDataRecord(final int id, final int userId, final int totalPrice) {
+ return mockInsertDataRecord("order", id, userId, totalPrice);
+ }
+
+ private DataRecord mockInsertDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+ DataRecord result = new DataRecord(new NopPosition(), 3);
+ result.setType(ScalingConstant.INSERT);
+ result.setTableName(tableName);
+ result.addColumn(new Column("id", id, true, true));
+ result.addColumn(new Column("user_id", userId, true, false));
+ result.addColumn(new Column("total_price", totalPrice, true, false));
+ return result;
+ }
+
+ private DataRecord mockUpdateDataRecord(final int id, final int userId, final int totalPrice) {
+ return mockUpdateDataRecord("order", null, id, userId, totalPrice);
+ }
+
+ private DataRecord mockUpdateDataRecord(final Integer oldId, final int id, final int userId, final int totalPrice) {
+ return mockUpdateDataRecord("order", oldId, id, userId, totalPrice);
+ }
+
+ private DataRecord mockUpdateDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+ return mockUpdateDataRecord(tableName, null, id, userId, totalPrice);
+ }
+
+ private DataRecord mockUpdateDataRecord(final String tableName, final Integer oldId, final int id, final int userId, final int totalPrice) {
+ DataRecord result = new DataRecord(new NopPosition(), 3);
+ result.setType(ScalingConstant.UPDATE);
+ result.setTableName(tableName);
+ result.addColumn(new Column("id", oldId, id, null != oldId, true));
+ result.addColumn(new Column("user_id", userId, true, false));
+ result.addColumn(new Column("total_price", totalPrice, true, false));
+ return result;
+ }
+
+ private DataRecord mockDeleteDataRecord(final int id, final int userId, final int totalPrice) {
+ return mockDeleteDataRecord("order", id, userId, totalPrice);
+ }
+
+ private DataRecord mockDeleteDataRecord(final String tableName, final int id, final int userId, final int totalPrice) {
+ DataRecord preDataRecord = new DataRecord(new NopPosition(), 3);
+ preDataRecord.setType(ScalingConstant.DELETE);
+ preDataRecord.setTableName(tableName);
+ preDataRecord.addColumn(new Column("id", id, true, true));
+ preDataRecord.addColumn(new Column("user_id", userId, true, false));
+ preDataRecord.addColumn(new Column("total_price", totalPrice, true, false));
+ return preDataRecord;
+ }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java
new file mode 100644
index 0000000..0e7eba6
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecordTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.record;
+
+import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class DataRecordTest {
+
+ private DataRecord beforeDataRecord;
+
+ private DataRecord afterDataRecord;
+
+ @Test
+ public void assertKeyEqual() {
+ beforeDataRecord = new DataRecord(new NopPosition(), 2);
+ beforeDataRecord.setTableName("t1");
+ beforeDataRecord.addColumn(new Column("id", 1, true, true));
+ beforeDataRecord.addColumn(new Column("name", "1", true, false));
+ afterDataRecord = new DataRecord(new NopPosition(), 2);
+ afterDataRecord.setTableName("t1");
+ afterDataRecord.addColumn(new Column("id", 1, true, true));
+ afterDataRecord.addColumn(new Column("name", "2", true, false));
+ assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getKey()));
+ }
+
+ @Test
+ public void assertOldKeyEqual() {
+ beforeDataRecord = new DataRecord(new NopPosition(), 2);
+ beforeDataRecord.setTableName("t1");
+ beforeDataRecord.addColumn(new Column("id", 1, true, true));
+ beforeDataRecord.addColumn(new Column("name", "1", true, false));
+ afterDataRecord = new DataRecord(new NopPosition(), 2);
+ afterDataRecord.setTableName("t1");
+ afterDataRecord.addColumn(new Column("id", 1, 2, true, true));
+ afterDataRecord.addColumn(new Column("name", "2", true, false));
+ assertThat(beforeDataRecord.getKey(), is(afterDataRecord.getOldKey()));
+ }
+}