You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/04/19 10:52:09 UTC

[shardingsphere] branch master updated: Refactor DataConsistencyChecker (#16927)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ba7197320c8 Refactor DataConsistencyChecker (#16927)
ba7197320c8 is described below

commit ba7197320c8b7eb93f35b5a39cabe8ecf8ed803d
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 18:52:02 2022 +0800

    Refactor DataConsistencyChecker (#16927)
---
 .../check/consistency/DataConsistencyChecker.java  | 56 ++++++++++------------
 1 file changed, 25 insertions(+), 31 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 1b260719595..236a6337a8a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
@@ -94,17 +95,17 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyCountCheckResult> checkCount() {
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobId() + "-count-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
             jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
             jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
+        Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(logicTableNames.size(), 1);
         try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
              PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>();
             for (String each : logicTableNames) {
-                result.put(each, countCheck(each, sourceDataSource, targetDataSource, executor));
+                result.put(each, checkCount(each, sourceDataSource, targetDataSource, executor));
             }
             return result;
         } catch (final SQLException ex) {
@@ -115,14 +116,7 @@ public final class DataConsistencyChecker {
         }
     }
     
-    private String getJobIdPrefix(final String jobId) {
-        if (jobId.length() <= 6) {
-            return jobId;
-        }
-        return jobId.substring(0, 6);
-    }
-    
-    private DataConsistencyCountCheckResult countCheck(
+    private DataConsistencyCountCheckResult checkCount(
             final String table, final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
         try {
             Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
@@ -147,18 +141,12 @@ public final class DataConsistencyChecker {
     }
     
     private Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
-        Collection<String> supportedDatabaseTypes = calculator.getSupportedDatabaseTypes();
-        PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
-        checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, sourceDataSourceConfig.getDatabaseType().getName());
-        PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
-                jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
-        checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName());
-        addDataSourceConfigToMySQL(sourceDataSourceConfig, targetDataSourceConfig);
-        Map<String, DataConsistencyContentCheckResult> result = new HashMap<>();
-        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-dataCheck-%d");
+        PipelineDataSourceConfiguration sourceDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getSource());
+        PipelineDataSourceConfiguration targetDataSourceConfig = getPipelineDataSourceConfiguration(calculator, jobConfig.getPipelineConfig().getTarget());
+        ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + jobConfig.getHandleConfig().getJobId() + "-data-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+        Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(logicTableNames.size(), 1);
         try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
              PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
             Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getSchemaName());
@@ -203,12 +191,27 @@ public final class DataConsistencyChecker {
         return result;
     }
     
-    private void checkDatabaseTypeSupportedOrNot(final Collection<String> supportedDatabaseTypes, final String databaseType) {
+    private PipelineDataSourceConfiguration getPipelineDataSourceConfiguration(final DataConsistencyCalculateAlgorithm calculator, final YamlPipelineDataSourceConfiguration dataSourceConfig) {
+        PipelineDataSourceConfiguration result = PipelineDataSourceConfigurationFactory.newInstance(dataSourceConfig.getType(), dataSourceConfig.getParameter());
+        checkDatabaseTypeSupported(calculator.getSupportedDatabaseTypes(), result.getDatabaseType().getName());
+        addMySQLDataSourceConfig(result);
+        return result;
+    }
+    
+    private void checkDatabaseTypeSupported(final Collection<String> supportedDatabaseTypes, final String databaseType) {
         if (!supportedDatabaseTypes.contains(databaseType)) {
             throw new PipelineDataConsistencyCheckFailedException("Database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
         }
     }
     
+    private void addMySQLDataSourceConfig(final PipelineDataSourceConfiguration dataSourceConfig) {
+        if (dataSourceConfig.getDatabaseType().getName().equalsIgnoreCase(new MySQLDatabaseType().getName())) {
+            Properties queryProps = new Properties();
+            queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
+            dataSourceConfig.appendJDBCQueryProperties(queryProps);
+        }
+    }
+    
     private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
         ContextManager contextManager = PipelineContext.getContextManager();
         Preconditions.checkNotNull(contextManager, "ContextManager null");
@@ -219,15 +222,6 @@ public final class DataConsistencyChecker {
         return metaData.getDefaultSchema().getTables();
     }
     
-    private void addDataSourceConfigToMySQL(final PipelineDataSourceConfiguration sourceDataSourceConfig, final PipelineDataSourceConfiguration targetDataSourceConfig) {
-        if (sourceDataSourceConfig.getDatabaseType().getName().equalsIgnoreCase(new MySQLDatabaseType().getName())) {
-            Properties queryProps = new Properties();
-            queryProps.setProperty("yearIsDateType", Boolean.FALSE.toString());
-            sourceDataSourceConfig.appendJDBCQueryProperties(queryProps);
-            targetDataSourceConfig.appendJDBCQueryProperties(queryProps);
-        }
-    }
-    
     private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource, final String tableName, final Collection<String> columnNames, 
                                                              final String sourceDatabaseType, final String targetDatabaseType, final String uniqueKey) {
         return new DataConsistencyCalculateParameter(sourceDataSource, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);