You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/03/21 04:10:36 UTC

[shardingsphere] branch master updated: Enable concurrent CRC32 match on source and target (#24708)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f46b90789 Enable concurrent CRC32 match on source and target (#24708)
f3f46b90789 is described below

commit f3f46b90789fba665e7abfcecb23d08c64104fe2
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Mar 21 12:10:28 2023 +0800

    Enable concurrent CRC32 match on source and target (#24708)
---
 .../SingleTableInventoryDataConsistencyChecker.java            | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index b278267f059..3539f7c1b31 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -101,8 +101,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
         String targetTableName = targetTable.getTableName().getOriginal();
         DataConsistencyCalculateParameter targetParam = buildParameter(targetDataSource, targetTable.getSchemaName().getOriginal(), targetTableName,
                 columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey, tableCheckPositions.get(targetTableName));
-        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParam).iterator();
-        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParam).iterator();
+        Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = waitFuture(executor.submit(() -> calculateAlgorithm.calculate(sourceParam))).iterator();
+        Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = waitFuture(executor.submit(() -> calculateAlgorithm.calculate(targetParam))).iterator();
         try {
             return check0(sourceCalculatedResults, targetCalculatedResults, executor);
             // CHECKSTYLE:OFF
@@ -127,10 +127,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
             if (null != readRateLimitAlgorithm) {
                 readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
             }
-            Future<DataConsistencyCalculatedResult> sourceFuture = executor.submit(sourceCalculatedResults::next);
-            Future<DataConsistencyCalculatedResult> targetFuture = executor.submit(targetCalculatedResults::next);
-            DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(sourceFuture);
-            DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(targetFuture);
+            DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
+            DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
             sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
             targetRecordsCount += targetCalculatedResult.getRecordsCount();
             contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);