You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/26 07:29:03 UTC
[shardingsphere] branch master updated: Refactor data record merge at pipeline (#25307)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7195cdf91a4 Refactor data record merge at pipeline (#25307)
7195cdf91a4 is described below
commit 7195cdf91a445d9de521328d27d1a289a05669cf
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Apr 26 15:28:46 2023 +0800
Refactor data record merge at pipeline (#25307)
* Refactor data record merge at pipeline
* Combine DataRecord merge and group
* Combine DataRecord merge and group
* improve error reason
---
.../pipeline/api/ingest/record/DataRecord.java | 2 +-
.../api/ingest/record/GroupedDataRecord.java | 8 +-
.../job/PipelineImporterJobWriteException.java | 4 +
.../pipeline/core/importer/DataRecordMerger.java | 156 +++++-----------
.../pipeline/core/importer/DataSourceImporter.java | 73 ++++++--
.../core/importer/DataRecordMergerTest.java | 202 +++++++--------------
.../pipeline/cases/task/E2EIncrementalTask.java | 15 +-
.../core/importer/DataSourceImporterTest.java | 1 +
8 files changed, 190 insertions(+), 271 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index c9cb71f7f6a..956887c95f7 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,7 +33,7 @@ import java.util.List;
*/
@Getter
@Setter
-@EqualsAndHashCode(of = {"tableName", "uniqueKeyValue"}, callSuper = false)
+@EqualsAndHashCode(of = "tableName", callSuper = false)
@ToString
public final class DataRecord extends Record {
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
index d0ae80aaaec..4cb8b94939d 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/GroupedDataRecord.java
@@ -30,9 +30,11 @@ public final class GroupedDataRecord {
private final String tableName;
- private final List<DataRecord> insertDataRecords;
+ private final List<DataRecord> batchInsertDataRecords;
- private final List<DataRecord> updateDataRecords;
+ private final List<DataRecord> batchUpdateDataRecords;
- private final List<DataRecord> deleteDataRecords;
+ private final List<DataRecord> batchDeleteDataRecords;
+
+ private final List<DataRecord> nonBatchRecords;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index b0f832722f7..48cf2130eea 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -30,4 +30,8 @@ public final class PipelineImporterJobWriteException extends PipelineSQLExceptio
public PipelineImporterJobWriteException(final Exception cause) {
super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data failed.", cause);
}
+
+ public PipelineImporterJobWriteException(final String reason, final Exception cause) {
+ super(XOpenSQLState.GENERAL_ERROR, 91, reason, cause);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 9b7ba2663e4..42dd3f29152 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -17,20 +17,21 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord.Key;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -38,37 +39,6 @@ import java.util.stream.Collectors;
*/
public final class DataRecordMerger {
- /**
- * Merge data record.
- * <pre>
- * insert + insert -> exception
- * update + insert -> exception
- * delete + insert -> insert
- * insert + update -> insert
- * update + update -> update
- * delete + update -> exception
- * insert + delete -> delete
- * update + delete -> delete
- * delete + delete -> exception
- * </pre>
- *
- * @param dataRecords data records
- * @return merged data records
- */
- public List<DataRecord> merge(final List<DataRecord> dataRecords) {
- Map<DataRecord.Key, DataRecord> result = new HashMap<>();
- dataRecords.forEach(each -> {
- if (IngestDataChangeType.INSERT.equals(each.getType())) {
- mergeInsert(each, result);
- } else if (IngestDataChangeType.UPDATE.equals(each.getType())) {
- mergeUpdate(each, result);
- } else if (IngestDataChangeType.DELETE.equals(each.getType())) {
- mergeDelete(each, result);
- }
- });
- return new ArrayList<>(result.values());
- }
-
/**
* Group by table and type.
*
@@ -76,90 +46,50 @@ public final class DataRecordMerger {
* @return grouped data records
*/
public List<GroupedDataRecord> group(final List<DataRecord> dataRecords) {
- List<GroupedDataRecord> result = new ArrayList<>(100);
- List<DataRecord> mergedDataRecords = dataRecords.get(0).getUniqueKeyValue().isEmpty() ? dataRecords : merge(dataRecords);
- Map<String, List<DataRecord>> tableGroup = mergedDataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
- for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
- Map<String, List<DataRecord>> typeGroup = entry.getValue().stream().collect(Collectors.groupingBy(DataRecord::getType));
- result.add(new GroupedDataRecord(entry.getKey(), typeGroup.get(IngestDataChangeType.INSERT), typeGroup.get(IngestDataChangeType.UPDATE), typeGroup.get(IngestDataChangeType.DELETE)));
- }
- return result;
- }
-
- private void mergeInsert(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
- DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
- ShardingSpherePreconditions.checkState(null == beforeDataRecord
- || IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
- dataRecords.put(dataRecord.getKey(), dataRecord);
- }
-
- private void mergeUpdate(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
- DataRecord beforeDataRecord = checkUpdatedPrimaryKey(dataRecord) ? dataRecords.get(dataRecord.getOldKey()) : dataRecords.get(dataRecord.getKey());
- if (null == beforeDataRecord) {
- dataRecords.put(dataRecord.getKey(), dataRecord);
- return;
- }
- ShardingSpherePreconditions.checkState(!IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new UnsupportedSQLOperationException("Not Delete"));
- if (checkUpdatedPrimaryKey(dataRecord)) {
- dataRecords.remove(dataRecord.getOldKey());
- }
- if (IngestDataChangeType.INSERT.equals(beforeDataRecord.getType())) {
- DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
- mergedDataRecord.setTableName(dataRecord.getTableName());
- mergedDataRecord.setType(IngestDataChangeType.INSERT);
- dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
- return;
+ int insertCount = 0;
+ Map<Key, Boolean> duplicateKeyMap = new HashMap<>();
+ Set<String> tableNames = new LinkedHashSet<>();
+ for (DataRecord each : dataRecords) {
+ if (IngestDataChangeType.INSERT.equals(each.getType())) {
+ insertCount++;
+ }
+ tableNames.add(each.getTableName());
+ Key key = getKeyFromDataRecord(each);
+ if (duplicateKeyMap.containsKey(key)) {
+ duplicateKeyMap.put(key, true);
+ continue;
+ }
+ duplicateKeyMap.put(key, false);
}
- if (IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType())) {
- DataRecord mergedDataRecord = mergeColumn(beforeDataRecord, dataRecord);
- mergedDataRecord.setTableName(dataRecord.getTableName());
- mergedDataRecord.setType(IngestDataChangeType.UPDATE);
- dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
+ List<GroupedDataRecord> result = new ArrayList<>(100);
+ if (insertCount == dataRecords.size()) {
+ Map<String, List<DataRecord>> tableGroup = dataRecords.stream().collect(Collectors.groupingBy(DataRecord::getTableName));
+ for (Entry<String, List<DataRecord>> entry : tableGroup.entrySet()) {
+ result.add(new GroupedDataRecord(entry.getKey(), entry.getValue(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
+ }
+ return result;
}
- }
-
- private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
- DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
- ShardingSpherePreconditions.checkState(null == beforeDataRecord
- || !IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
- if (null != beforeDataRecord && IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
- DataRecord mergedDataRecord = new DataRecord(dataRecord.getPosition(), dataRecord.getColumnCount());
- for (int i = 0; i < dataRecord.getColumnCount(); i++) {
- mergedDataRecord.addColumn(new Column(dataRecord.getColumn(i).getName(),
- dataRecord.getColumn(i).isUniqueKey() ? beforeDataRecord.getColumn(i).getOldValue() : beforeDataRecord.getColumn(i).getValue(), true, dataRecord.getColumn(i).isUniqueKey()));
+ Map<String, List<DataRecord>> nonBatchRecords = new LinkedHashMap<>();
+ Map<String, Map<String, List<DataRecord>>> batchDataRecords = new LinkedHashMap<>();
+ for (DataRecord each : dataRecords) {
+ Key key = getKeyFromDataRecord(each);
+ if (duplicateKeyMap.getOrDefault(key, false)) {
+ nonBatchRecords.computeIfAbsent(each.getTableName(), ignored -> new LinkedList<>()).add(each);
+ continue;
}
- mergedDataRecord.setTableName(dataRecord.getTableName());
- mergedDataRecord.setType(IngestDataChangeType.DELETE);
- dataRecords.remove(beforeDataRecord.getKey());
- dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
- } else {
- dataRecords.put(dataRecord.getOldKey(), dataRecord);
+ Map<String, List<DataRecord>> recordMap = batchDataRecords.computeIfAbsent(each.getTableName(), ignored -> new HashMap<>());
+ recordMap.computeIfAbsent(each.getType(), ignored -> new LinkedList<>()).add(each);
}
- }
-
- private boolean checkUpdatedPrimaryKey(final DataRecord dataRecord) {
- return RecordUtils.extractPrimaryColumns(dataRecord).stream().anyMatch(Column::isUpdated);
- }
-
- private DataRecord mergeColumn(final DataRecord preDataRecord, final DataRecord curDataRecord) {
- DataRecord result = new DataRecord(curDataRecord.getPosition(), curDataRecord.getColumnCount());
- for (int i = 0; i < curDataRecord.getColumnCount(); i++) {
- result.addColumn(new Column(
- curDataRecord.getColumn(i).getName(),
- preDataRecord.getColumn(i).isUniqueKey()
- ? mergePrimaryKeyOldValue(preDataRecord.getColumn(i), curDataRecord.getColumn(i))
- : null,
- curDataRecord.getColumn(i).getValue(),
- preDataRecord.getColumn(i).isUpdated() || curDataRecord.getColumn(i).isUpdated(),
- curDataRecord.getColumn(i).isUniqueKey()));
+ for (String each : tableNames) {
+ Map<String, List<DataRecord>> batchMap = batchDataRecords.getOrDefault(each, Collections.emptyMap());
+ List<DataRecord> nonBatchRecordMap = nonBatchRecords.getOrDefault(each, Collections.emptyList());
+ result.add(new GroupedDataRecord(each, batchMap.getOrDefault(IngestDataChangeType.INSERT, Collections.emptyList()),
+ batchMap.getOrDefault(IngestDataChangeType.UPDATE, Collections.emptyList()), batchMap.getOrDefault(IngestDataChangeType.DELETE, Collections.emptyList()), nonBatchRecordMap));
}
return result;
}
- private Object mergePrimaryKeyOldValue(final Column beforeColumn, final Column column) {
- if (beforeColumn.isUpdated()) {
- return beforeColumn.getOldValue();
- }
- return column.isUpdated() ? column.getOldValue() : null;
+ private Key getKeyFromDataRecord(final DataRecord dataRecord) {
+ return IngestDataChangeType.DELETE.equals(dataRecord.getType()) ? dataRecord.getOldKey() : dataRecord.getKey();
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 83692765b88..1ba4b49f1f5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -48,10 +48,13 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Default importer.
@@ -119,9 +122,10 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
}
List<GroupedDataRecord> result = MERGER.group(dataRecords);
for (GroupedDataRecord each : result) {
- flushInternal(dataSource, each.getDeleteDataRecords());
- flushInternal(dataSource, each.getInsertDataRecords());
- flushInternal(dataSource, each.getUpdateDataRecords());
+ flushInternal(dataSource, each.getBatchDeleteDataRecords());
+ flushInternal(dataSource, each.getBatchInsertDataRecords());
+ flushInternal(dataSource, each.getBatchUpdateDataRecords());
+ sequentialFlush(dataSource, each.getNonBatchRecords());
}
return new PipelineJobProgressUpdatedParameter(insertRecordNumber);
}
@@ -130,15 +134,11 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
if (null == buffer || buffer.isEmpty()) {
return;
}
- try {
- tryFlush(dataSource, buffer);
- } catch (final SQLException ex) {
- throw new PipelineImporterJobWriteException(ex);
- }
+ tryFlush(dataSource, buffer);
}
@SneakyThrows(InterruptedException.class)
- private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) throws SQLException {
+ private void tryFlush(final DataSource dataSource, final List<DataRecord> buffer) {
for (int i = 0; isRunning() && i <= importerConfig.getRetryTimes(); i++) {
try {
doFlush(dataSource, buffer);
@@ -146,7 +146,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
} catch (final SQLException ex) {
log.error("flush failed {}/{} times.", i, importerConfig.getRetryTimes(), ex);
if (i == importerConfig.getRetryTimes()) {
- throw ex;
+ throw new PipelineImporterJobWriteException(ex);
}
Thread.sleep(Math.min(5 * 60 * 1000L, 1000L << i));
}
@@ -182,6 +182,30 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
}
}
+ private void doFlush(final Connection connection, final DataRecord each) throws SQLException {
+ switch (each.getType()) {
+ case IngestDataChangeType.INSERT:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+ }
+ executeBatchInsert(connection, Collections.singletonList(each));
+ break;
+ case IngestDataChangeType.UPDATE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
+ }
+ executeUpdate(connection, each);
+ break;
+ case IngestDataChangeType.DELETE:
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
+ }
+ executeBatchDelete(connection, Collections.singletonList(each));
+ break;
+ default:
+ }
+ }
+
private void executeBatchInsert(final Connection connection, final List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
@@ -243,16 +267,41 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
for (DataRecord each : dataRecords) {
conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
- preparedStatement.setObject(i + 1, conditionColumns.get(i).getOldValue());
+ Object oldValue = conditionColumns.get(i).getOldValue();
+ if (null == oldValue) {
+ log.warn("Record old value is null, record={}", each);
+ }
+ preparedStatement.setObject(i + 1, oldValue);
}
preparedStatement.addBatch();
}
- preparedStatement.executeBatch();
+ int[] counts = preparedStatement.executeBatch();
+ if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
+ log.warn("batchDelete failed, counts={}, sql={}", Arrays.toString(counts), deleteSQL);
+ }
} finally {
batchDeleteStatement = null;
}
}
+ private void sequentialFlush(final DataSource dataSource, final List<DataRecord> buffer) {
+ if (buffer.isEmpty()) {
+ return;
+ }
+ try (Connection connection = dataSource.getConnection()) {
+ // TODO it's better use transaction, but execute delete maybe not effect when open transaction of PostgreSQL sometimes
+ for (DataRecord each : buffer) {
+ try {
+ doFlush(connection, each);
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(String.format("Write failed, record=%s", each), ex);
+ }
+ }
+ } catch (final SQLException ex) {
+ throw new PipelineImporterJobWriteException(ex);
+ }
+ }
+
@Override
protected void doStop() throws SQLException {
cancelStatement(batchInsertStatement);
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index fa8c7c5ca33..93f6cc62a77 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -21,213 +21,143 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPo
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineUnexpectedDataRecordOrderException;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
class DataRecordMergerTest {
private final DataRecordMerger dataRecordMerger = new DataRecordMerger();
- private DataRecord beforeDataRecord;
-
- private DataRecord afterDataRecord;
-
- private Collection<DataRecord> actual;
-
- @Test
- void assertInsertBeforeInsert() {
- beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- afterDataRecord = mockInsertDataRecord(1, 1, 1);
- assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
- }
-
- @Test
- void assertUpdateBeforeInsert() {
- beforeDataRecord = mockUpdateDataRecord(1, 2, 2);
- afterDataRecord = mockInsertDataRecord(1, 1, 1);
- assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
- }
-
@Test
void assertDeleteBeforeInsert() {
- beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
- afterDataRecord = mockInsertDataRecord(1, 1, 1);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockDeleteDataRecord(1, 2, 2);
+ DataRecord afterDataRecord = mockInsertDataRecord(1, 1, 1);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertInsertBeforeUpdate() {
- beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
- assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertInsertBeforeUpdatePrimaryKey() {
- beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 1, 2, 2);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.INSERT));
- assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertUpdateBeforeUpdate() {
- beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
- assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertUpdateBeforeUpdatePrimaryKey() {
- beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(1, 2, 2, 2);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
- assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
- }
-
- @Test
- void assertUpdatePrimaryKeyBeforeUpdate() {
- beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- afterDataRecord = mockUpdateDataRecord(2, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
- assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
- assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(2));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
+ assertDataRecordsMatched(actual.iterator().next().getBatchUpdateDataRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertUpdatePrimaryKeyBeforeUpdatePrimaryKey() {
- beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ DataRecord afterDataRecord = mockUpdateDataRecord(2, 3, 2, 2);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.UPDATE));
- assertThat(dataRecord.getTableName(), is("order"));
- assertThat(dataRecord.getColumn(0).getOldValue(), is(1));
- assertThat(dataRecord.getColumn(0).getValue(), is(3));
- assertThat(dataRecord.getColumn(1).getValue(), is(2));
- assertThat(dataRecord.getColumn(2).getValue(), is(2));
- }
-
- @Test
- void assertDeleteBeforeUpdate() {
- beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
- afterDataRecord = mockUpdateDataRecord(1, 2, 2);
- assertThrows(UnsupportedSQLOperationException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+ assertThat(actual.get(0).getBatchUpdateDataRecords().size(), is(2));
+ GroupedDataRecord actualGroupedDataRecord = actual.get(0);
+ DataRecord actualFirstDataRecord = actualGroupedDataRecord.getBatchUpdateDataRecords().get(0);
+ assertThat(actualFirstDataRecord.getType(), is(IngestDataChangeType.UPDATE));
+ assertThat(actualFirstDataRecord.getTableName(), is("order"));
+ assertThat(actualFirstDataRecord.getColumn(0).getOldValue(), is(1));
+ assertThat(actualFirstDataRecord.getColumn(0).getValue(), is(2));
+ assertThat(actualFirstDataRecord.getColumn(1).getValue(), is(1));
+ assertThat(actualFirstDataRecord.getColumn(2).getValue(), is(1));
+ DataRecord actualSecondDataRecord = actualGroupedDataRecord.getBatchUpdateDataRecords().get(1);
+ assertThat(actualSecondDataRecord.getType(), is(IngestDataChangeType.UPDATE));
+ assertThat(actualSecondDataRecord.getTableName(), is("order"));
+ assertThat(actualSecondDataRecord.getColumn(0).getOldValue(), is(2));
+ assertThat(actualSecondDataRecord.getColumn(0).getValue(), is(3));
+ assertThat(actualSecondDataRecord.getColumn(1).getValue(), is(2));
+ assertThat(actualSecondDataRecord.getColumn(2).getValue(), is(2));
}
@Test
void assertInsertBeforeDelete() {
- beforeDataRecord = mockInsertDataRecord(1, 1, 1);
- afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockInsertDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertUpdateBeforeDelete() {
- beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
- afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 1, 1);
+ DataRecord afterDataRecord = mockDeleteDataRecord(1, 1, 1);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- assertThat(actual.iterator().next(), sameInstance(afterDataRecord));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
@Test
void assertUpdatePrimaryKeyBeforeDelete() {
- beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
- afterDataRecord = mockDeleteDataRecord(2, 1, 1);
- actual = dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord));
+ DataRecord beforeDataRecord = mockUpdateDataRecord(1, 2, 1, 1);
+ DataRecord afterDataRecord = mockDeleteDataRecord(2, 1, 1);
+ List<GroupedDataRecord> actual = dataRecordMerger.group(Arrays.asList(beforeDataRecord, afterDataRecord));
assertThat(actual.size(), is(1));
- DataRecord dataRecord = actual.iterator().next();
- assertThat(dataRecord.getType(), is(IngestDataChangeType.DELETE));
- assertThat(dataRecord.getTableName(), is("order"));
- assertNull(dataRecord.getColumn(0).getOldValue());
- assertThat(dataRecord.getColumn(0).getValue(), is(1));
- assertThat(dataRecord.getColumn(1).getValue(), is(1));
- assertThat(dataRecord.getColumn(2).getValue(), is(1));
+ assertDataRecordsMatched(actual.iterator().next().getNonBatchRecords(), Arrays.asList(beforeDataRecord, afterDataRecord));
}
- @Test
- void assertDeleteBeforeDelete() {
- beforeDataRecord = mockDeleteDataRecord(1, 1, 1);
- afterDataRecord = mockDeleteDataRecord(1, 1, 1);
- assertThrows(PipelineUnexpectedDataRecordOrderException.class, () -> dataRecordMerger.merge(Arrays.asList(beforeDataRecord, afterDataRecord)));
+ private void assertDataRecordsMatched(final List<DataRecord> actualRecords, final List<DataRecord> expectedRecords) {
+ for (int i = 0; i < actualRecords.size(); i++) {
+ assertThat(actualRecords.get(0), sameInstance(expectedRecords.get(0)));
+ }
}
@Test
void assertGroup() {
- List<DataRecord> dataRecords = mockDataRecords();
- List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
- assertThat(groupedDataRecords.size(), is(2));
- assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
- assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
- assertThat(groupedDataRecords.get(0).getInsertDataRecords().size(), is(1));
- assertThat(groupedDataRecords.get(0).getUpdateDataRecords().size(), is(1));
- assertThat(groupedDataRecords.get(0).getDeleteDataRecords().size(), is(1));
- }
-
- private List<DataRecord> mockDataRecords() {
- return Arrays.asList(
+ List<DataRecord> dataRecords = Arrays.asList(
mockInsertDataRecord("t1", 1, 1, 1),
mockUpdateDataRecord("t1", 1, 2, 1),
mockUpdateDataRecord("t1", 1, 2, 2),
mockUpdateDataRecord("t1", 2, 1, 1),
mockUpdateDataRecord("t1", 2, 2, 1),
mockUpdateDataRecord("t1", 2, 2, 2),
+ mockInsertDataRecord("t1", 10, 10, 10),
mockDeleteDataRecord("t1", 3, 1, 1),
mockInsertDataRecord("t2", 1, 1, 1));
+ List<GroupedDataRecord> groupedDataRecords = dataRecordMerger.group(dataRecords);
+ assertThat(groupedDataRecords.size(), is(2));
+ assertThat(groupedDataRecords.get(0).getTableName(), is("t1"));
+ assertThat(groupedDataRecords.get(0).getBatchInsertDataRecords().size(), is(1));
+ assertThat(groupedDataRecords.get(0).getBatchUpdateDataRecords().size(), is(0));
+ assertThat(groupedDataRecords.get(0).getBatchDeleteDataRecords().size(), is(1));
+ assertThat(groupedDataRecords.get(0).getNonBatchRecords().size(), is(6));
+ assertThat(groupedDataRecords.get(1).getTableName(), is("t2"));
+ assertThat(groupedDataRecords.get(1).getBatchInsertDataRecords().size(), is(1));
}
private DataRecord mockInsertDataRecord(final int id, final int userId, final int totalPrice) {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 7298b116ef0..19b9e2b09e9 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -101,20 +101,22 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
int randomInt = random.nextInt(-100, 100);
if (databaseType instanceof MySQLDatabaseType) {
String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "?");
- log.info("update sql: {}", sql);
int randomUnsignedInt = random.nextInt(10, 100);
LocalDateTime now = LocalDateTime.now();
- DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
+ Object[] parameters = {"中文测试", randomInt, randomInt, randomInt, randomUnsignedInt, randomUnsignedInt, randomUnsignedInt,
randomUnsignedInt, 1.0F, 1.0, new BigDecimal("999"), now, now, now.toLocalDate(), now.toLocalTime(), Year.now().getValue() + 1, new byte[]{}, new byte[]{1, 2, -1, -3},
- "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId});
+ "D".getBytes(), "A".getBytes(), "T".getBytes(), "E", "text", "mediumText", "3", "3", PipelineCaseHelper.generateJsonString(32, true), orderId};
+ log.info("update sql: {}, params: {}", sql, parameters);
+ DataSourceExecuteUtils.execute(dataSource, sql, parameters);
return;
}
if (databaseType instanceof SchemaSupportedDatabaseType) {
String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
- log.info("update sql: {}", sql);
- DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
+ Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
- LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId});
+ LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId};
+ log.info("update sql: {}, params: {}", sql, parameters);
+ DataSourceExecuteUtils.execute(dataSource, sql, parameters);
}
}
@@ -124,6 +126,7 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
private void deleteOrderById(final Object orderId) {
String sql = SQLBuilderUtils.buildDeleteSQL(orderTableName, "order_id");
+ log.info("delete sql: {}, params: {}", sql, orderId);
DataSourceExecuteUtils.execute(dataSource, sql, new Object[]{orderId});
}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index d5466ba5ee3..dc0140be0d5 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -106,6 +106,7 @@ class DataSourceImporterTest {
DataRecord deleteRecord = getDataRecord("DELETE");
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt(), any())).thenReturn(mockRecords(deleteRecord));
+ when(preparedStatement.executeBatch()).thenReturn(new int[]{1});
jdbcImporter.run();
verify(preparedStatement).setObject(1, 1);
verify(preparedStatement).setObject(2, 10);