You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/03/21 04:10:36 UTC
[shardingsphere] branch master updated: Enable concurrent CRC32 match on source and target (#24708)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f3f46b90789 Enable concurrent CRC32 match on source and target (#24708)
f3f46b90789 is described below
commit f3f46b90789fba665e7abfcecb23d08c64104fe2
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Mar 21 12:10:28 2023 +0800
Enable concurrent CRC32 match on source and target (#24708)
---
.../SingleTableInventoryDataConsistencyChecker.java | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index b278267f059..3539f7c1b31 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -101,8 +101,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
String targetTableName = targetTable.getTableName().getOriginal();
DataConsistencyCalculateParameter targetParam = buildParameter(targetDataSource, targetTable.getSchemaName().getOriginal(), targetTableName,
columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey, tableCheckPositions.get(targetTableName));
- Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParam).iterator();
- Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParam).iterator();
+ Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> calculateAlgorithm.calculate(sourceParam))).iterator();
+ Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> calculateAlgorithm.calculate(targetParam))).iterator();
try {
return check0(sourceCalculatedResults, targetCalculatedResults, executor);
// CHECKSTYLE:OFF
@@ -127,10 +127,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
if (null != readRateLimitAlgorithm) {
readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
}
- Future<DataConsistencyCalculatedResult> sourceFuture = executor.submit(sourceCalculatedResults::next);
- Future<DataConsistencyCalculatedResult> targetFuture = executor.submit(targetCalculatedResults::next);
- DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(sourceFuture);
- DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(targetFuture);
+ DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
+ DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
targetRecordsCount += targetCalculatedResult.getRecordsCount();
contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);