You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/18 11:27:20 UTC

[shardingsphere] branch master updated: Make scaling source and target data consistency calculation concurrent (#13076)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ba99f7  Make scaling source and target data consistency calculation concurrent (#13076)
7ba99f7 is described below

commit 7ba99f7c4dd8fbfcbd394d92f4fd94f0a22f80f4
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Mon Oct 18 19:26:43 2021 +0800

    Make scaling source and target data consistency calculation concurrent (#13076)
    
    * Concurrent countCheck
    
    * Concurrent dataCheck
---
 .../consistency/DataConsistencyCheckerImpl.java    | 53 ++++++++++++++++------
 1 file changed, 40 insertions(+), 13 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
index eb6773a..a77d993 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
@@ -21,6 +21,7 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.scaling.core.api.SingleTableDataCalculator;
 import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
@@ -41,6 +42,12 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -58,20 +65,29 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
     
     @Override
     public Map<String, DataConsistencyCheckResult> countCheck() {
-        return jobContext.getTaskConfigs()
-                .stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
-                .stream().collect(Collectors.toMap(Function.identity(), this::countCheck, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + jobContext.getJobId() % 10_000 + "-countCheck-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            return jobContext.getTaskConfigs()
+                    .stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
+                    .stream().collect(Collectors.toMap(Function.identity(), table -> countCheck(table, executor), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
+        }
     }
     
-    private DataConsistencyCheckResult countCheck(final String table) {
+    private DataConsistencyCheckResult countCheck(final String table, final ThreadPoolExecutor executor) {
         ScalingDataSourceConfiguration sourceConfig = jobContext.getJobConfig().getRuleConfig().getSource().unwrap();
         ScalingDataSourceConfiguration targetConfig = jobContext.getJobConfig().getRuleConfig().getTarget().unwrap();
         try (DataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceConfig);
              DataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetConfig)) {
-            long sourceCount = count(sourceDataSource, table, sourceConfig.getDatabaseType());
-            long targetCount = count(targetDataSource, table, targetConfig.getDatabaseType());
+            Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceConfig.getDatabaseType()));
+            Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, table, targetConfig.getDatabaseType()));
+            long sourceCount = sourceFuture.get();
+            long targetCount = targetFuture.get();
             return new DataConsistencyCheckResult(sourceCount, targetCount);
-        } catch (final SQLException ex) {
+        } catch (final SQLException | InterruptedException | ExecutionException ex) {
             throw new DataCheckFailException(String.format("table %s count check failed.", table), ex);
         }
     }
@@ -106,12 +122,23 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
         SingleTableDataCalculator sourceCalculator = checkAlgorithm.getSingleTableDataCalculator(sourceConfig.getDatabaseType().getName());
         SingleTableDataCalculator targetCalculator = checkAlgorithm.getSingleTableDataCalculator(targetConfig.getDatabaseType().getName());
         Map<String, Boolean> result = new HashMap<>();
-        for (String each : logicTableNames) {
-            Collection<String> columnNames = tablesColumnNamesMap.get(each);
-            Object sourceCalculateResult = sourceCalculator.dataCalculate(sourceConfig, each, columnNames);
-            Object targetCalculateResult = targetCalculator.dataCalculate(targetConfig, each, columnNames);
-            boolean calculateResultsEquals = Objects.equals(sourceCalculateResult, targetCalculateResult);
-            result.put(each, calculateResultsEquals);
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + jobContext.getJobId() % 10_000 + "-dataCheck-%d");
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+        try {
+            for (String each : logicTableNames) {
+                Collection<String> columnNames = tablesColumnNamesMap.get(each);
+                Future<Object> sourceFuture = executor.submit(() -> sourceCalculator.dataCalculate(sourceConfig, each, columnNames));
+                Future<Object> targetFuture = executor.submit(() -> targetCalculator.dataCalculate(targetConfig, each, columnNames));
+                Object sourceCalculateResult = sourceFuture.get();
+                Object targetCalculateResult = targetFuture.get();
+                boolean calculateResultsEquals = Objects.equals(sourceCalculateResult, targetCalculateResult);
+                result.put(each, calculateResultsEquals);
+            }
+        } catch (final ExecutionException | InterruptedException ex) {
+            throw new DataCheckFailException("data check failed");
+        } finally {
+            executor.shutdown();
+            executor.shutdownNow();
         }
         return result;
     }