You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/04/19 05:37:50 UTC

[shardingsphere] branch master updated: Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e106a422d4 Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)
8e106a422d4 is described below

commit 8e106a422d467ea4d6a48536ad59742766238e3b
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 13:37:44 2022 +0800

    Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)
    
    * Refactor RuleAlteredJobAPIImpl
    
    * Merge DataConsistencyChecker.checkCount() and checkContent() to check()
---
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 31 ++++++----------
 .../check/consistency/DataConsistencyChecker.java  | 33 ++++++++++-------
 .../consistency/DataConsistencyCheckerTest.java    | 42 +++++++---------------
 3 files changed, 44 insertions(+), 62 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 5dd6670b237..bb4de577c56 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,6 @@ import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -274,7 +272,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
             log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
             return Collections.emptyMap();
         }
-        return dataConsistencyCheck0(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
+        return dataConsistencyCheck(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
     }
     
     @Override
@@ -283,29 +281,22 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
         log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
         JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
         verifyDataConsistencyCheck(jobConfig);
-        return dataConsistencyCheck0(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
+        return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
     }
     
-    private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
-        verifyManualMode(jobConfig);
-        verifySourceWritingStopped(jobConfig);
-    }
-    
-    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
+    private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
         String jobId = jobConfig.getHandleConfig().getJobId();
-        DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobConfig);
-        Map<String, DataConsistencyCountCheckResult> countCheckResult = dataConsistencyChecker.checkCount();
-        Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
-                ? dataConsistencyChecker.checkContent(calculator) : Collections.emptyMap();
-        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), countCheckResult);
-        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
-        for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
-            result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
-        }
+        Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
+        log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
         return result;
     }
     
+    private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+        verifyManualMode(jobConfig);
+        verifySourceWritingStopped(jobConfig);
+    }
+    
     @Override
     public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
         if (checkResults.isEmpty()) {
@@ -316,7 +307,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
             boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
             boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
             if (!isCountMatched || !isContentMatched) {
-                log.error("Scaling job: {}, table: {} data consistency check failed, countMatched: {}, contentMatched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+                log.error("Scaling job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
                 return false;
             }
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index ab684880a6b..1b260719595 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import com.google.common.base.Preconditions;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
@@ -47,10 +47,12 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -63,7 +65,6 @@ import java.util.concurrent.TimeUnit;
 /**
  * Data consistency checker.
  */
-@Slf4j
 public final class DataConsistencyChecker {
     
     private final JobConfiguration jobConfig;
@@ -76,11 +77,23 @@ public final class DataConsistencyChecker {
     }
     
     /**
-     * Check whether each table's records count is valid.
-     *
-     * @return records count check result. key is logic table name, value is check result.
+     * Check data consistency.
+     * 
+     * @param calculator data consistency calculate algorithm
+     * @return checked result. key is logic table name, value is check result.
      */
-    public Map<String, DataConsistencyCountCheckResult> checkCount() {
+    public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
+        Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
+        Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
+                ? checkContent(calculator) : Collections.emptyMap();
+        Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
+        for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
+            result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
+        }
+        return result;
+    }
+    
+    private Map<String, DataConsistencyCountCheckResult> checkCount() {
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
@@ -133,13 +146,7 @@ public final class DataConsistencyChecker {
         }
     }
     
-    /**
-     * Check whether each table's records content is valid.
-     *
-     * @param calculator data consistency calculate algorithm
-     * @return records content check result. key is logic table name, value is check result.
-     */
-    public Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
+    private Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
         Collection<String> supportedDatabaseTypes = calculator.getSupportedDatabaseTypes();
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
                 jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index ad4bba6ff2f..9bf74d1caba 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -17,9 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -29,14 +28,9 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import javax.sql.DataSource;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -51,36 +45,26 @@ public final class DataConsistencyCheckerTest {
     }
     
     @Test
-    public void assertCountAndDataCheck() {
+    public void assertCountAndDataCheck() throws SQLException {
+        Map<String, DataConsistencyCheckResult> actual = new DataConsistencyChecker(createJobConfiguration()).check(new FixtureDataConsistencyCalculateAlgorithm());
+        assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
+        assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
+        assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
+    }
+    
+    private JobConfiguration createJobConfiguration() throws SQLException {
         RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
         initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-        DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobContext.getJobConfig());
-        Map<String, DataConsistencyCountCheckResult> countCheckResults = dataConsistencyChecker.checkCount();
-        assertTrue(countCheckResults.get("t_order").isMatched());
-        assertThat(countCheckResults.get("t_order").getSourceRecordsCount(), is(countCheckResults.get("t_order").getTargetRecordsCount()));
-        Map<String, DataConsistencyContentCheckResult> contentCheckResults = dataConsistencyChecker.checkContent(new FixtureDataConsistencyCalculateAlgorithm());
-        assertTrue(contentCheckResults.get("t_order").isMatched());
+        return jobContext.getJobConfig();
     }
     
-    @SneakyThrows(SQLException.class)
-    private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig) {
-        DataSource dataSource = new PipelineDataSourceManager().getDataSource(dataSourceConfig);
-        try (Connection connection = dataSource.getConnection();
+    private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig) throws SQLException {
+        try (Connection connection = new PipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
              Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
             statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
         }
     }
-    
-    @Test(expected = InvocationTargetException.class)
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void assertCheckDatabaseTypeSupported() {
-        RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
-        DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobContext.getJobConfig());
-        Method method = dataConsistencyChecker.getClass().getDeclaredMethod("checkDatabaseTypeSupportedOrNot", Collection.class, String.class);
-        method.setAccessible(true);
-        method.invoke(dataConsistencyChecker, Arrays.asList("MySQL", "PostgreSQL"), "H2");
-    }
 }