You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/04/19 10:52:09 UTC
[shardingsphere] branch master updated: Refactor DataConsistencyChecker (#16927)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ba7197320c8 Refactor DataConsistencyChecker (#16927)
ba7197320c8 is described below
commit ba7197320c8b7eb93f35b5a39cabe8ecf8ed803d
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 18:52:02 2022 +0800
Refactor DataConsistencyChecker (#16927)
---
.../check/consistency/DataConsistencyChecker.java | 56 ++++++++++------------
1 file changed, 25 insertions(+), 31 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 1b260719595..236a6337a8a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -94,17 +95,17 @@ public final class DataConsistencyChecker {
}
private Map<String, DataConsistencyCountCheckResult> checkCount() {
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobId() + "-count-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+ Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(logicTableNames.size(), 1);
try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>();
for (String each : logicTableNames) {
- result.put(each, countCheck(each, sourceDataSource, targetDataSource, executor));
+ result.put(each, checkCount(each, sourceDataSource, targetDataSource, executor));
}
return result;
} catch (final SQLException ex) {
@@ -115,14 +116,7 @@ public final class DataConsistencyChecker {
}
}
- private String getJobIdPrefix(final String jobId) {
- if (jobId.length() <= 6) {
- return jobId;
- }
- return jobId.substring(0, 6);
- }
-
- private DataConsistencyCountCheckResult countCheck(
+ private DataConsistencyCountCheckResult checkCount(
final String table, final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
try {
Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
@@ -147,18 +141,12 @@ public final class DataConsistencyChecker {
}
private Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
- Collection<String> supportedDatabaseTypes = calculator.getSupportedDatabaseTypes();
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
- checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, sourceDataSourceConfig.getDatabaseType().getName());
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
- jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
- checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName());
- addDataSourceConfigToMySQL(sourceDataSourceConfig, targetDataSourceConfig);
- Map<String, DataConsistencyContentCheckResult> result = new HashMap<>();
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-dataCheck-%d");
+ PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getSource());
+ PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getTarget());
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobId() + "-data-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+ Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getSchemaName());
@@ -203,12 +191,27 @@ public final class DataConsistencyChecker {
return result;
}
- private void checkDatabaseTypeSupportedOrNot(final Collection<String> supportedDatabaseTypes, final String databaseType) {
+ private PipelineDataSourceConfiguration getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) {
+ PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), dataSourceConfig.getParameter());
+ checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), result.getDatabaseType().getName());
+ addMySQLDataSourceConfig(result);
+ return result;
+ }
+
+ private void checkDatabaseTypeSupported(final Collection<String> supportedDatabaseTypes, final String databaseType) {
if (!supportedDatabaseTypes.contains(databaseType)) {
throw new PipelineDataConsistencyCheckFailedException("Database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
}
}
+ private void addMySQLDataSourceConfig(final PipelineDataSourceConfiguration dataSourceConfig) {
+ if (dataSourceConfig.getDatabaseType().getName().equalsIgnoreCase(new MySQLDatabaseType().getName())) {
+ Properties queryProps = new Properties();
+ queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
+ dataSourceConfig.appendJDBCQueryProperties(queryProps);
+ }
+ }
+
private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
ContextManager contextManager = PipelineContext.getContextManager();
Preconditions.checkNotNull(contextManager, "ContextManager null");
@@ -219,15 +222,6 @@ public final class DataConsistencyChecker {
return metaData.getDefaultSchema().getTables();
}
- private void addDataSourceConfigToMySQL(final PipelineDataSourceConfiguration sourceDataSourceConfig, final PipelineDataSourceConfiguration targetDataSourceConfig) {
- if (sourceDataSourceConfig.getDatabaseType().getName().equalsIgnoreCase(new MySQLDatabaseType().getName())) {
- Properties queryProps = new Properties();
- queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
- sourceDataSourceConfig.appendJDBCQueryProperties(queryProps);
- targetDataSourceConfig.appendJDBCQueryProperties(queryProps);
- }
- }
-
private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource, final String tableName, final Collection<String> columnNames,
final String sourceDatabaseType, final String targetDatabaseType, final String uniqueKey) {
return new DataConsistencyCalculateParameter(sourceDataSource, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);