You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/04/19 05:37:50 UTC
[shardingsphere] branch master updated: Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8e106a422d4 Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)
8e106a422d4 is described below
commit 8e106a422d467ea4d6a48536ad59742766238e3b
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 13:37:44 2022 +0800
Merge DataConsistencyChecker.checkCount() and checkContent() to check() (#16921)
* Refactor RuleAlteredJobAPIImpl
* Merge DataConsistencyChecker.checkCount() and checkContent() to check()
---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 31 ++++++----------
.../check/consistency/DataConsistencyChecker.java | 33 ++++++++++-------
.../consistency/DataConsistencyCheckerTest.java | 42 +++++++---------------
3 files changed, 44 insertions(+), 62 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 5dd6670b237..bb4de577c56 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,6 @@ import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -274,7 +272,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
log.info("DataConsistencyCalculatorAlgorithm is not configured, data consistency check is ignored.");
return Collections.emptyMap();
}
- return dataConsistencyCheck0(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
+ return dataConsistencyCheck(jobConfig, ruleAlteredContext.getDataConsistencyCalculateAlgorithm());
}
@Override
@@ -283,29 +281,22 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
JobConfiguration jobConfig = getJobConfig(getElasticJobConfigPOJO(jobId));
verifyDataConsistencyCheck(jobConfig);
- return dataConsistencyCheck0(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
+ return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, new Properties()));
}
- private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
- verifyManualMode(jobConfig);
- verifySourceWritingStopped(jobConfig);
- }
-
- private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
+ private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
String jobId = jobConfig.getHandleConfig().getJobId();
- DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobConfig);
- Map<String, DataConsistencyCountCheckResult> countCheckResult = dataConsistencyChecker.checkCount();
- Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
- ? dataConsistencyChecker.checkContent(calculator) : Collections.emptyMap();
- log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), countCheckResult);
- Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
- for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
- result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
- }
+ Map<String, DataConsistencyCheckResult> result = new DataConsistencyChecker(jobConfig).check(calculator);
+ log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
+ private void verifyDataConsistencyCheck(final JobConfiguration jobConfig) {
+ verifyManualMode(jobConfig);
+ verifySourceWritingStopped(jobConfig);
+ }
+
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
if (checkResults.isEmpty()) {
@@ -316,7 +307,7 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
if (!isCountMatched || !isContentMatched) {
- log.error("Scaling job: {}, table: {} data consistency check failed, countMatched: {}, contentMatched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+ log.error("Scaling job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
return false;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index ab684880a6b..1b260719595 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import com.google.common.base.Preconditions;
-import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
@@ -47,10 +47,12 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
@@ -63,7 +65,6 @@ import java.util.concurrent.TimeUnit;
/**
* Data consistency checker.
*/
-@Slf4j
public final class DataConsistencyChecker {
private final JobConfiguration jobConfig;
@@ -76,11 +77,23 @@ public final class DataConsistencyChecker {
}
/**
- * Check whether each table's records count is valid.
- *
- * @return records count check result. key is logic table name, value is check result.
+ * Check data consistency.
+ *
+ * @param calculator data consistency calculate algorithm
+ * @return checked result. key is logic table name, value is check result.
*/
- public Map<String, DataConsistencyCountCheckResult> checkCount() {
+ public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
+ Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
+ Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
+ ? checkContent(calculator) : Collections.emptyMap();
+ Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
+ for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
+ result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
+ }
+ return result;
+ }
+
+ private Map<String, DataConsistencyCountCheckResult> checkCount() {
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
@@ -133,13 +146,7 @@ public final class DataConsistencyChecker {
}
}
- /**
- * Check whether each table's records content is valid.
- *
- * @param calculator data consistency calculate algorithm
- * @return records content check result. key is logic table name, value is check result.
- */
- public Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
+ private Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm calculator) {
Collection<String> supportedDatabaseTypes = calculator.getSupportedDatabaseTypes();
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index ad4bba6ff2f..9bf74d1caba 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -17,9 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -29,14 +28,9 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
import org.junit.BeforeClass;
import org.junit.Test;
-import javax.sql.DataSource;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@@ -51,36 +45,26 @@ public final class DataConsistencyCheckerTest {
}
@Test
- public void assertCountAndDataCheck() {
+ public void assertCountAndDataCheck() throws SQLException {
+ Map<String, DataConsistencyCheckResult> actual = new DataConsistencyChecker(createJobConfiguration()).check(new FixtureDataConsistencyCalculateAlgorithm());
+ assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
+ assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
+ assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
+ }
+
+ private JobConfiguration createJobConfiguration() throws SQLException {
RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
- DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobContext.getJobConfig());
- Map<String, DataConsistencyCountCheckResult> countCheckResults = dataConsistencyChecker.checkCount();
- assertTrue(countCheckResults.get("t_order").isMatched());
- assertThat(countCheckResults.get("t_order").getSourceRecordsCount(), is(countCheckResults.get("t_order").getTargetRecordsCount()));
- Map<String, DataConsistencyContentCheckResult> contentCheckResults = dataConsistencyChecker.checkContent(new FixtureDataConsistencyCalculateAlgorithm());
- assertTrue(contentCheckResults.get("t_order").isMatched());
+ return jobContext.getJobConfig();
}
- @SneakyThrows(SQLException.class)
- private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig) {
- DataSource dataSource = new PipelineDataSourceManager().getDataSource(dataSourceConfig);
- try (Connection connection = dataSource.getConnection();
+ private void initTableData(final PipelineDataSourceConfiguration dataSourceConfig) throws SQLException {
+ try (Connection connection = new PipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (order_id, user_id) VALUES (1, 'xxx'), (999, 'yyy')");
}
}
-
- @Test(expected = InvocationTargetException.class)
- @SneakyThrows(ReflectiveOperationException.class)
- public void assertCheckDatabaseTypeSupported() {
- RuleAlteredJobContext jobContext = new RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration());
- DataConsistencyChecker dataConsistencyChecker = new DataConsistencyChecker(jobContext.getJobConfig());
- Method method = dataConsistencyChecker.getClass().getDeclaredMethod("checkDatabaseTypeSupportedOrNot", Collection.class, String.class);
- method.setAccessible(true);
- method.invoke(dataConsistencyChecker, Arrays.asList("MySQL", "PostgreSQL"), "H2");
- }
}