You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2021/10/18 11:27:20 UTC
[shardingsphere] branch master updated: Make scaling source and
target data consistency calculation concurrent (#13076)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7ba99f7 Make scaling source and target data consistency calculation concurrent (#13076)
7ba99f7 is described below
commit 7ba99f7c4dd8fbfcbd394d92f4fd94f0a22f80f4
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Mon Oct 18 19:26:43 2021 +0800
Make scaling source and target data consistency calculation concurrent (#13076)
* Concurrent countCheck
* Concurrent dataCheck
---
.../consistency/DataConsistencyCheckerImpl.java | 53 ++++++++++++++++------
1 file changed, 40 insertions(+), 13 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
index eb6773a..a77d993 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/consistency/DataConsistencyCheckerImpl.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.scaling.core.api.SingleTableDataCalculator;
import org.apache.shardingsphere.scaling.core.common.datasource.DataSourceFactory;
@@ -41,6 +42,12 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -58,20 +65,29 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
@Override
public Map<String, DataConsistencyCheckResult> countCheck() {
- return jobContext.getTaskConfigs()
- .stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
- .stream().collect(Collectors.toMap(Function.identity(), this::countCheck, (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + jobContext.getJobId() % 10_000 + "-countCheck-%d");
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ try {
+ return jobContext.getTaskConfigs()
+ .stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).collect(Collectors.toSet())
+ .stream().collect(Collectors.toMap(Function.identity(), table -> countCheck(table, executor), (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ } finally {
+ executor.shutdown();
+ executor.shutdownNow();
+ }
}
- private DataConsistencyCheckResult countCheck(final String table) {
+ private DataConsistencyCheckResult countCheck(final String table, final ThreadPoolExecutor executor) {
ScalingDataSourceConfiguration sourceConfig = jobContext.getJobConfig().getRuleConfig().getSource().unwrap();
ScalingDataSourceConfiguration targetConfig = jobContext.getJobConfig().getRuleConfig().getTarget().unwrap();
try (DataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceConfig);
DataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetConfig)) {
- long sourceCount = count(sourceDataSource, table, sourceConfig.getDatabaseType());
- long targetCount = count(targetDataSource, table, targetConfig.getDatabaseType());
+ Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceConfig.getDatabaseType()));
+ Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, table, targetConfig.getDatabaseType()));
+ long sourceCount = sourceFuture.get();
+ long targetCount = targetFuture.get();
return new DataConsistencyCheckResult(sourceCount, targetCount);
- } catch (final SQLException ex) {
+ } catch (final SQLException | InterruptedException | ExecutionException ex) {
throw new DataCheckFailException(String.format("table %s count check failed.", table), ex);
}
}
@@ -106,12 +122,23 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
SingleTableDataCalculator sourceCalculator = checkAlgorithm.getSingleTableDataCalculator(sourceConfig.getDatabaseType().getName());
SingleTableDataCalculator targetCalculator = checkAlgorithm.getSingleTableDataCalculator(targetConfig.getDatabaseType().getName());
Map<String, Boolean> result = new HashMap<>();
- for (String each : logicTableNames) {
- Collection<String> columnNames = tablesColumnNamesMap.get(each);
- Object sourceCalculateResult = sourceCalculator.dataCalculate(sourceConfig, each, columnNames);
- Object targetCalculateResult = targetCalculator.dataCalculate(targetConfig, each, columnNames);
- boolean calculateResultsEquals = Objects.equals(sourceCalculateResult, targetCalculateResult);
- result.put(each, calculateResultsEquals);
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + jobContext.getJobId() % 10_000 + "-dataCheck-%d");
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ try {
+ for (String each : logicTableNames) {
+ Collection<String> columnNames = tablesColumnNamesMap.get(each);
+ Future<Object> sourceFuture = executor.submit(() -> sourceCalculator.dataCalculate(sourceConfig, each, columnNames));
+ Future<Object> targetFuture = executor.submit(() -> targetCalculator.dataCalculate(targetConfig, each, columnNames));
+ Object sourceCalculateResult = sourceFuture.get();
+ Object targetCalculateResult = targetFuture.get();
+ boolean calculateResultsEquals = Objects.equals(sourceCalculateResult, targetCalculateResult);
+ result.put(each, calculateResultsEquals);
+ }
+ } catch (final ExecutionException | InterruptedException ex) {
+ throw new DataCheckFailException("data check failed");
+ } finally {
+ executor.shutdown();
+ executor.shutdownNow();
}
return result;
}