You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/04/19 01:52:43 UTC
[shardingsphere] branch master updated: Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d2a9db55f30 Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)
d2a9db55f30 is described below
commit d2a9db55f3003834c28be596572bbb1b47e9940c
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Tue Apr 19 09:52:38 2022 +0800
Add DataConsistencyCountCheckResult and DataConsistencyContentCheckResult to avoid set field in DataConsistencyCheckResult (#16910)
* Refactor DataConsistencyCheckResult
* Refactor DataConsistencyChecker
* Refactor CheckScalingQueryResultSet
* Refactor DataConsistencyContentCheckResult and DataConsistencyCountCheckResult
* Move packages
* Remove useless getter for DataConsistencyCheckerImpl
* Refactor DataConsistencyCheckerImpl
---
.../handler/query/CheckScalingQueryResultSet.java | 14 ++---
.../core/api/impl/RuleAlteredJobAPIImpl.java | 26 +++++----
.../check/consistency/DataConsistencyChecker.java | 9 +--
.../consistency/DataConsistencyCheckerImpl.java | 64 +++++++++------------
...StreamingDataConsistencyCalculateAlgorithm.java | 2 +-
...RC32MatchDataConsistencyCalculateAlgorithm.java | 2 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 2 +-
.../core/datasource/PipelineDataSourceFactory.java | 5 +-
.../core/datasource/PipelineDataSourceManager.java | 6 +-
.../memory}/MemoryPipelineChannelFactory.java | 4 +-
.../data/pipeline/core/job/FinishedCheckJob.java | 4 +-
.../scenario/rulealtered/RuleAlteredContext.java | 2 +-
.../job/environment/ScalingEnvironmentManager.java | 4 +-
...k.consistency.DataConsistencyCalculateAlgorithm | 4 +-
...eline.spi.ingest.channel.PipelineChannelFactory | 2 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 2 +-
.../fixture/FixturePipelineSQLBuilder.java | 2 +-
...k.consistency.DataConsistencyCalculateAlgorithm | 4 +-
...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder | 2 +-
.../data/pipeline/api/RuleAlteredJobAPI.java | 4 +-
.../consistency/DataConsistencyCheckResult.java | 20 ++-----
...java => DataConsistencyContentCheckResult.java} | 22 ++------
...t.java => DataConsistencyCountCheckResult.java} | 14 ++---
.../test/mysql/env/ITEnvironmentContext.java | 3 +-
.../test/mysql/env/config/TargetConfiguration.java | 11 ----
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 65 ++++++++++++++--------
.../DataConsistencyCheckerImplTest.java | 13 +++--
.../pipeline/core/util/PipelineContextUtil.java | 4 +-
.../scenario/rulealtered/RuleAlteredJobTest.java | 2 +-
29 files changed, 144 insertions(+), 174 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
index 5858521f6bd..dc5b5dee3f8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/CheckScalingQueryResultSet.java
@@ -52,13 +52,13 @@ public final class CheckScalingQueryResultSet implements DistSQLResultSet {
}
data = checkResultMap.entrySet().stream()
.map(each -> {
- Collection<Object> list = new LinkedList<>();
- list.add(each.getKey());
- list.add(each.getValue().getSourceRecordsCount());
- list.add(each.getValue().getTargetRecordsCount());
- list.add(each.getValue().isRecordsCountMatched() + "");
- list.add(each.getValue().isRecordsContentMatched() + "");
- return list;
+ Collection<Object> result = new LinkedList<>();
+ result.add(each.getKey());
+ result.add(each.getValue().getCountCheckResult().getSourceRecordsCount());
+ result.add(each.getValue().getCountCheckResult().getTargetRecordsCount());
+ result.add(each.getValue().getCountCheckResult().isMatched() + "");
+ result.add(each.getValue().getContentCheckResult().isMatched() + "");
+ return result;
}).collect(Collectors.toList()).iterator();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index ed2b9cab1af..c64c62021ba 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.WorkflowConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -29,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgo
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
@@ -293,24 +295,26 @@ public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl impl
private Map<String, DataConsistencyCheckResult> dataConsistencyCheck0(final JobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
String jobId = jobConfig.getHandleConfig().getJobId();
DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobConfig);
- Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.checkRecordsCount();
- if (result.values().stream().allMatch(DataConsistencyCheckResult::isRecordsCountMatched)) {
- Map<String, Boolean> contentCheckResult = dataConsistencyChecker.checkRecordsContent(calculator);
- result.forEach((key, value) -> value.setRecordsContentMatched(contentCheckResult.getOrDefault(key, false)));
+ Map<String, DataConsistencyCountCheckResult> countCheckResult = dataConsistencyChecker.checkCount();
+ Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
+ ? dataConsistencyChecker.checkContent(calculator) : Collections.emptyMap();
+ log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), countCheckResult);
+ Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
+ for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
+ result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
}
- log.info("Scaling job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getClass().getName(), result);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobCheckResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
return result;
}
@Override
- public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
- if (checkResultMap.isEmpty()) {
+ public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
+ if (checkResults.isEmpty()) {
return false;
}
- for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
- boolean recordsCountMatched = entry.getValue().isRecordsCountMatched();
- boolean recordsContentMatched = entry.getValue().isRecordsContentMatched();
+ for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
+ boolean recordsCountMatched = entry.getValue().getCountCheckResult().isMatched();
+ boolean recordsContentMatched = entry.getValue().getContentCheckResult().isMatched();
if (!recordsContentMatched || !recordsCountMatched) {
log.error("Scaling job: {}, table: {} data consistency check failed, recordsContentMatched: {}, recordsCountMatched: {}",
jobId, entry.getKey(), recordsContentMatched, recordsCountMatched);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 7b6ec52b6cc..904ea28e053 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -17,13 +17,14 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
/**
- * Data consistency checker interface.
+ * Data consistency checker.
*/
public interface DataConsistencyChecker {
@@ -32,7 +33,7 @@ public interface DataConsistencyChecker {
*
* @return records count check result. key is logic table name, value is check result.
*/
- Map<String, DataConsistencyCheckResult> checkRecordsCount();
+ Map<String, DataConsistencyCountCheckResult> checkCount();
/**
* Check whether each table's records content is valid.
@@ -40,5 +41,5 @@ public interface DataConsistencyChecker {
* @param calculator data consistency calculate algorithm
* @return records content check result. key is logic table name, value is check result.
*/
- Map<String, Boolean> checkRecordsContent(DataConsistencyCalculateAlgorithm calculator);
+ Map<String, DataConsistencyContentCheckResult> checkContent(DataConsistencyCalculateAlgorithm calculator);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index e4c69c65790..075d686b091 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -18,10 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import com.google.common.base.Preconditions;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -31,7 +31,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -64,44 +63,35 @@ import java.util.concurrent.TimeUnit;
/**
* Data consistency checker implementation.
*/
-@Getter
@Slf4j
public final class DataConsistencyCheckerImpl implements DataConsistencyChecker {
- private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-
private final JobConfiguration jobConfig;
- private final RuleAlteredContext ruleAlteredContext;
-
- private final String jobId;
-
private final Collection<String> logicTableNames;
public DataConsistencyCheckerImpl(final JobConfiguration jobConfig) {
this.jobConfig = jobConfig;
- ruleAlteredContext = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- jobId = jobConfig.getHandleConfig().getJobId();
logicTableNames = jobConfig.getHandleConfig().splitLogicTableNames();
}
@Override
- public Map<String, DataConsistencyCheckResult> checkRecordsCount() {
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobId) + "-countCheck-%d");
+ public Map<String, DataConsistencyCountCheckResult> checkCount() {
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-count-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
- try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
- Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
+ try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+ PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
+ Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>();
for (String each : logicTableNames) {
result.put(each, countCheck(each, sourceDataSource, targetDataSource, executor));
}
return result;
} catch (final SQLException ex) {
- throw new PipelineDataConsistencyCheckFailedException("count check failed", ex);
+ throw new PipelineDataConsistencyCheckFailedException("Count check failed", ex);
} finally {
executor.shutdown();
executor.shutdownNow();
@@ -115,16 +105,16 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
return jobId.substring(0, 6);
}
- private DataConsistencyCheckResult countCheck(
+ private DataConsistencyCountCheckResult countCheck(
final String table, final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
try {
Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, table, targetDataSource.getDatabaseType()));
long sourceCount = sourceFuture.get();
long targetCount = targetFuture.get();
- return new DataConsistencyCheckResult(sourceCount, targetCount);
+ return new DataConsistencyCountCheckResult(sourceCount, targetCount);
} catch (final InterruptedException | ExecutionException ex) {
- throw new PipelineDataConsistencyCheckFailedException(String.format("count check failed for table '%s'", table), ex);
+ throw new PipelineDataConsistencyCheckFailedException(String.format("Count check failed for table '%s'", table), ex);
}
}
@@ -135,12 +125,12 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
resultSet.next();
return resultSet.getLong(1);
} catch (final SQLException ex) {
- throw new PipelineDataConsistencyCheckFailedException(String.format("count for table '%s' failed", table), ex);
+ throw new PipelineDataConsistencyCheckFailedException(String.format("Count for table '%s' failed", table), ex);
}
}
@Override
- public Map<String, Boolean> checkRecordsContent(final DataConsistencyCalculateAlgorithm dataConsistencyCalculator) {
+ public Map<String, DataConsistencyContentCheckResult> checkContent(final DataConsistencyCalculateAlgorithm dataConsistencyCalculator) {
Collection<String> supportedDatabaseTypes = dataConsistencyCalculator.getSupportedDatabaseTypes();
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getPipelineConfig().getSource().getType(), jobConfig.getPipelineConfig().getSource().getParameter());
@@ -149,17 +139,17 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
jobConfig.getPipelineConfig().getTarget().getType(), jobConfig.getPipelineConfig().getTarget().getParameter());
checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName());
addDataSourceConfigToMySQL(sourceDataSourceConfig, targetDataSourceConfig);
- Map<String, Boolean> result = new HashMap<>();
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobId) + "-dataCheck-%d");
+ Map<String, DataConsistencyContentCheckResult> result = new HashMap<>();
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job" + getJobIdPrefix(jobConfig.getHandleConfig().getJobId()) + "-dataCheck-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- JobRateLimitAlgorithm inputRateLimitAlgorithm = ruleAlteredContext.getInputRateLimitAlgorithm();
- try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) {
+ JobRateLimitAlgorithm inputRateLimitAlgorithm = RuleAlteredJobWorker.createRuleAlteredContext(jobConfig).getInputRateLimitAlgorithm();
+ try (PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
+ PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
Map<String, TableMetaData> tableMetaDataMap = getTableMetaDataMap(jobConfig.getWorkflowConfig().getSchemaName());
logicTableNames.forEach(each -> {
//TODO put to preparer
if (!tableMetaDataMap.containsKey(each)) {
- throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each));
+ throw new PipelineDataConsistencyCheckFailedException(String.format("Could not get metadata for table '%s'", each));
}
});
String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getName();
@@ -172,7 +162,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, each, columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
Iterator<Object> sourceCalculatedResults = dataConsistencyCalculator.calculate(sourceParameter).iterator();
Iterator<Object> targetCalculatedResults = dataConsistencyCalculator.calculate(targetParameter).iterator();
- boolean calculateResultsEquals = true;
+ boolean contentMatched = true;
while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
if (null != inputRateLimitAlgorithm) {
inputRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
@@ -181,15 +171,15 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
Object sourceCalculatedResult = sourceFuture.get();
Object targetCalculatedResult = targetFuture.get();
- calculateResultsEquals = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
- if (!calculateResultsEquals) {
+ contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+ if (!contentMatched) {
break;
}
}
- result.put(each, calculateResultsEquals);
+ result.put(each, new DataConsistencyContentCheckResult(contentMatched));
}
} catch (final ExecutionException | InterruptedException | SQLException ex) {
- throw new PipelineDataConsistencyCheckFailedException("data check failed", ex);
+ throw new PipelineDataConsistencyCheckFailedException("Data check failed", ex);
} finally {
executor.shutdown();
executor.shutdownNow();
@@ -199,16 +189,16 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker
private void checkDatabaseTypeSupportedOrNot(final Collection<String> supportedDatabaseTypes, final String databaseType) {
if (!supportedDatabaseTypes.contains(databaseType)) {
- throw new PipelineDataConsistencyCheckFailedException("database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
+ throw new PipelineDataConsistencyCheckFailedException("Database type " + databaseType + " is not supported in " + supportedDatabaseTypes);
}
}
private Map<String, TableMetaData> getTableMetaDataMap(final String schemaName) {
ContextManager contextManager = PipelineContext.getContextManager();
- Preconditions.checkNotNull(contextManager, "contextManager null");
+ Preconditions.checkNotNull(contextManager, "ContextManager null");
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData(schemaName);
if (null == metaData) {
- throw new RuntimeException("Can not get metaData by schemaName " + schemaName);
+ throw new RuntimeException("Can not get meta data by schema name " + schemaName);
}
return metaData.getDefaultSchema().getTables();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index 9c0987533d2..d866a90a456 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 8128b6cda49..0adf19036aa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
similarity index 99%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index bf69453c271..a00b0ab406c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import lombok.Getter;
import lombok.NonNull;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
index 2109afc1dd8..e9ff3c18602 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceFactory.java
@@ -17,6 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -28,6 +30,7 @@ import java.sql.SQLException;
/**
* Pipeline data source factory.
*/
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineDataSourceFactory {
/**
@@ -37,7 +40,7 @@ public final class PipelineDataSourceFactory {
* @return new data source wrapper
*/
@SneakyThrows(SQLException.class)
- public PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
+ public static PipelineDataSourceWrapper newInstance(final PipelineDataSourceConfiguration dataSourceConfig) {
DataSource pipelineDataSource = PipelineDataSourceCreatorFactory.getInstance(dataSourceConfig.getType()).createPipelineDataSource(dataSourceConfig.getDataSourceConfiguration());
return new PipelineDataSourceWrapper(pipelineDataSource, dataSourceConfig.getDatabaseType());
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
index 2754bb73b0c..8659174fe4b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.datasource;
-import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -29,12 +28,9 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Pipeline data source manager.
*/
-@NoArgsConstructor
@Slf4j
public final class PipelineDataSourceManager implements AutoCloseable {
- private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-
private final Map<PipelineDataSourceConfiguration, PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
/**
@@ -53,7 +49,7 @@ public final class PipelineDataSourceManager implements AutoCloseable {
if (null != result) {
return result;
}
- result = dataSourceFactory.newInstance(dataSourceConfig);
+ result = PipelineDataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, result);
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
similarity index 88%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
index 01c69f94dfc..2f119b90882 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/ingest/channel/MemoryPipelineChannelFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelFactory.java
@@ -15,15 +15,13 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel;
+package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
import com.google.common.base.Strings;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import java.util.Properties;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 07311cda059..d09647b5969 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -113,8 +112,7 @@ public final class FinishedCheckJob implements SimpleJob {
private boolean dataConsistencyCheck(final JobConfiguration jobConfig) {
String jobId = jobConfig.getHandleConfig().getJobId();
log.info("dataConsistencyCheck for job {}", jobId);
- Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
- return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap);
+ return ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, ruleAlteredJobAPI.dataConsistencyCheck(jobConfig));
}
private void switchClusterConfiguration(final String schemaName, final JobConfiguration jobConfig, final RuleBasedJobLockAlgorithm checkoutLockAlgorithm) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index b188c4a3771..b03a1e92140 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
index bcb6b3c3b7e..772ba8e478b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/environment/ScalingEnvironmentManager.java
@@ -36,8 +36,6 @@ import java.util.Collection;
@Slf4j
public final class ScalingEnvironmentManager {
- private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory();
-
/**
* Cleanup target tables.
*
@@ -49,7 +47,7 @@ public final class ScalingEnvironmentManager {
Collection<String> tables = jobConfig.getHandleConfig().splitLogicTableNames();
log.info("cleanupTargetTables, tables={}", tables);
YamlPipelineDataSourceConfiguration target = jobConfig.getPipelineConfig().getTarget();
- try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
+ try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(target.getType(), target.getParameter()));
Connection connection = dataSource.getConnection()) {
for (String each : tables) {
String sql = PipelineSQLBuilderFactory.newInstance(jobConfig.getHandleConfig().getTargetDatabaseType()).buildTruncateSQL(each);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
index b095d5571b0..d122b3da26d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCalculateAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.CRC32MatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
index 7c0c3d65284..976e49b8443 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory
+org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
similarity index 97%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index 791b7d223c7..0fd9da9f68b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/spi/check/consistency/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
similarity index 96%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 82b127e7baa..d76a21aaf8e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/fixture/FixturePipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture;
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.fixture;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
index b095d5571b0..d122b3da26d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.CRC32MatchDataConsistencyCalculateAlgorithm
-org.apache.shardingsphere.data.pipeline.core.spi.check.consistency.DataMatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.CRC32MatchDataConsistencyCalculateAlgorithm
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index df902547e48..ab29ad1625d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.fixture.FixturePipelineSQLBuilder
+org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.fixture.FixturePipelineSQLBuilder
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
index 8a31e1ff43b..16e0d0d7253 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
@@ -147,10 +147,10 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
* Aggregate data consistency check results.
*
* @param jobId job id
- * @param checkResultMap check result map
+ * @param checkResults check results
* @return check success or not
*/
- boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResultMap);
+ boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResults);
/**
* Switch job source schema's configuration to job target configuration.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 895ecac0c7b..04452249d44 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -18,28 +18,18 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
/**
* Data consistency check result.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-@ToString
+@ToString(callSuper = true)
public final class DataConsistencyCheckResult {
- private final long sourceRecordsCount;
+ private final DataConsistencyCountCheckResult countCheckResult;
- private final long targetRecordsCount;
-
- private final boolean recordsCountMatched;
-
- private boolean recordsContentMatched;
-
- public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
- this.sourceRecordsCount = sourceRecordsCount;
- this.targetRecordsCount = targetRecordsCount;
- recordsCountMatched = sourceRecordsCount == targetRecordsCount;
- }
+ private final DataConsistencyContentCheckResult contentCheckResult;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
similarity index 61%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
index 895ecac0c7b..5e87aad5999 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyContentCheckResult.java
@@ -18,28 +18,16 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
/**
- * Data consistency check result.
+ * Data consistency content check result.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
@ToString
-public final class DataConsistencyCheckResult {
+public final class DataConsistencyContentCheckResult {
- private final long sourceRecordsCount;
-
- private final long targetRecordsCount;
-
- private final boolean recordsCountMatched;
-
- private boolean recordsContentMatched;
-
- public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
- this.sourceRecordsCount = sourceRecordsCount;
- this.targetRecordsCount = targetRecordsCount;
- recordsCountMatched = sourceRecordsCount == targetRecordsCount;
- }
+ private final boolean matched;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
index 895ecac0c7b..d166dba8a1d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCountCheckResult.java
@@ -18,28 +18,24 @@
package org.apache.shardingsphere.data.pipeline.api.check.consistency;
import lombok.Getter;
-import lombok.Setter;
import lombok.ToString;
/**
- * Data consistency check result.
+ * Data consistency count check result.
*/
@Getter
-@Setter
@ToString
-public final class DataConsistencyCheckResult {
+public final class DataConsistencyCountCheckResult {
private final long sourceRecordsCount;
private final long targetRecordsCount;
- private final boolean recordsCountMatched;
+ private final boolean matched;
- private boolean recordsContentMatched;
-
- public DataConsistencyCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
+ public DataConsistencyCountCheckResult(final long sourceRecordsCount, final long targetRecordsCount) {
this.sourceRecordsCount = sourceRecordsCount;
this.targetRecordsCount = targetRecordsCount;
- recordsCountMatched = sourceRecordsCount == targetRecordsCount;
+ matched = sourceRecordsCount == targetRecordsCount;
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
index 4001d83e04b..bf2fbe1a44c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/ITEnvironmentContext.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.integration.scaling.test.mysql.env.cases.DataSet;
import org.apache.shardingsphere.integration.scaling.test.mysql.env.cases.Type;
import org.apache.shardingsphere.integration.scaling.test.mysql.env.config.SourceConfiguration;
@@ -59,7 +60,7 @@ public final class ITEnvironmentContext {
Map<String, YamlTableRuleConfiguration> sourceTableRules = createSourceTableRules();
scalingConfiguration = createScalingConfiguration(sourceTableRules);
sourceDataSource = SourceConfiguration.createHostDataSource(sourceTableRules);
- targetDataSource = TargetConfiguration.createHostDataSource();
+ targetDataSource = PipelineDataSourceFactory.newInstance(TargetConfiguration.getHostConfiguration());
}
@SneakyThrows
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
index 8773569d661..ad78c1f7f5c 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/scaling/test/mysql/env/config/TargetConfiguration.java
@@ -18,10 +18,8 @@
package org.apache.shardingsphere.integration.scaling.test.mysql.env.config;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.integration.scaling.test.mysql.env.IntegrationTestEnvironment;
-import javax.sql.DataSource;
import java.util.Properties;
/**
@@ -54,13 +52,4 @@ public final class TargetConfiguration {
private static StandardPipelineDataSourceConfiguration getConfiguration(final String host) {
return new StandardPipelineDataSourceConfiguration(String.format(TARGET_JDBC_URL, host), ENGINE_ENV_PROPS.getProperty("db.username"), ENGINE_ENV_PROPS.getProperty("db.password"));
}
-
- /**
- * Create host standard pipeline data source.
- *
- * @return data source
- */
- public static DataSource createHostDataSource() {
- return new PipelineDataSourceFactory().newInstance(getHostConfiguration());
- }
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 4081ceb976a..54a339c5f41 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -17,11 +17,12 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
-import com.google.common.collect.ImmutableMap;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.PipelineConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -44,6 +45,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -145,31 +147,46 @@ public final class RuleAlteredJobAPIImplTest {
Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE");
ruleAlteredJobAPI.restoreClusterWriteDB(schemaName, jobId.get());
assertThat(checkResultMap.size(), is(1));
- assertTrue(checkResultMap.get("t_order").isRecordsCountMatched());
- assertTrue(checkResultMap.get("t_order").isRecordsContentMatched());
- assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), is(2L));
+ assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
+ assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
+ assertTrue(checkResultMap.get("t_order").getContentCheckResult().isMatched());
}
@Test
- public void assertAggregateDataConsistencyCheckResults() {
- String jobId = "1";
- Map<String, DataConsistencyCheckResult> checkResultMap;
- checkResultMap = Collections.emptyMap();
- assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
- DataConsistencyCheckResult trueResult = new DataConsistencyCheckResult(1, 1);
- trueResult.setRecordsContentMatched(true);
- DataConsistencyCheckResult checkResult;
- checkResult = new DataConsistencyCheckResult(100, 95);
- checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
- assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
- checkResult = new DataConsistencyCheckResult(100, 100);
- checkResult.setRecordsContentMatched(false);
- checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
- assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(false));
- checkResult = new DataConsistencyCheckResult(100, 100);
- checkResult.setRecordsContentMatched(true);
- checkResultMap = ImmutableMap.<String, DataConsistencyCheckResult>builder().put("t", trueResult).put("t_order", checkResult).build();
- assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults(jobId, checkResultMap), is(true));
+ public void assertAggregateEmptyDataConsistencyCheckResults() {
+ assertThat(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()), is(false));
+ }
+
+ @Test
+ public void assertAggregateDifferentCountDataConsistencyCheckResults() {
+ DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+ DataConsistencyCountCheckResult notEqualCountCheckResult = new DataConsistencyCountCheckResult(100, 95);
+ DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(false);
+ Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+ checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+ checkResults.put("bar_tbl", new DataConsistencyCheckResult(notEqualCountCheckResult, equalContentCheckResult));
+ assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+ }
+
+ @Test
+ public void assertAggregateDifferentContentDataConsistencyCheckResults() {
+ DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+ DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(true);
+ DataConsistencyContentCheckResult notEqualContentCheckResult = new DataConsistencyContentCheckResult(false);
+ Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+ checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+ checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, notEqualContentCheckResult));
+ assertFalse(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
+ }
+
+ @Test
+ public void assertAggregateSameDataConsistencyCheckResults() {
+ DataConsistencyCountCheckResult equalCountCheckResult = new DataConsistencyCountCheckResult(100, 100);
+ DataConsistencyContentCheckResult equalContentCheckResult = new DataConsistencyContentCheckResult(true);
+ Map<String, DataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1);
+ checkResults.put("foo_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+ checkResults.put("bar_tbl", new DataConsistencyCheckResult(equalCountCheckResult, equalContentCheckResult));
+ assertTrue(ruleAlteredJobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}
@Test(expected = PipelineVerifyFailedException.class)
@@ -214,7 +231,7 @@ public final class RuleAlteredJobAPIImplTest {
ruleAlteredJobAPI.stop(jobId.get());
ruleAlteredJobAPI.reset(jobId.get());
Map<String, DataConsistencyCheckResult> checkResultMap = ruleAlteredJobAPI.dataConsistencyCheck(jobConfig);
- assertThat(checkResultMap.get("t_order").getTargetRecordsCount(), is(0L));
+ assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(0L));
}
@SneakyThrows(SQLException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
index 5018da801b7..df560ed78f4 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImplTest.java
@@ -18,7 +18,8 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureDataConsistencyCalculateAlgorithm;
@@ -56,11 +57,11 @@ public final class DataConsistencyCheckerImplTest {
initTableData(jobContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
initTableData(jobContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
DataConsistencyChecker dataConsistencyChecker = EnvironmentCheckerFactory.newInstance(jobContext.getJobConfig());
- Map<String, DataConsistencyCheckResult> resultMap = dataConsistencyChecker.checkRecordsCount();
- assertTrue(resultMap.get("t_order").isRecordsCountMatched());
- assertThat(resultMap.get("t_order").getSourceRecordsCount(), is(resultMap.get("t_order").getTargetRecordsCount()));
- Map<String, Boolean> dataCheckResultMap = dataConsistencyChecker.checkRecordsContent(new FixtureDataConsistencyCalculateAlgorithm());
- assertTrue(dataCheckResultMap.get("t_order"));
+ Map<String, DataConsistencyCountCheckResult> countCheckResults = dataConsistencyChecker.checkCount();
+ assertTrue(countCheckResults.get("t_order").isMatched());
+ assertThat(countCheckResults.get("t_order").getSourceRecordsCount(), is(countCheckResults.get("t_order").getTargetRecordsCount()));
+ Map<String, DataConsistencyContentCheckResult> contentCheckResults = dataConsistencyChecker.checkContent(new FixtureDataConsistencyCalculateAlgorithm());
+ assertTrue(contentCheckResults.get("t_order").isMatched());
}
@SneakyThrows(SQLException.class)
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
index bb1ad78d0c7..91cbb4f749d 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineContextUtil.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.fixture.EmbedTestingServer;
-import org.apache.shardingsphere.data.pipeline.core.spi.ingest.channel.MemoryPipelineChannelFactory;
+import org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelFactory;
import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -94,7 +94,7 @@ public final class PipelineContextUtil {
}
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig = new ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtil.readFile("config_sharding_sphere_jdbc_source.yaml"));
- ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) new PipelineDataSourceFactory().newInstance(pipelineDataSourceConfig).getDataSource();
+ ShardingSphereDataSource shardingSphereDataSource = (ShardingSphereDataSource) PipelineDataSourceFactory.newInstance(pipelineDataSourceConfig).getDataSource();
ContextManager contextManager = shardingSphereDataSource.getContextManager();
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(getClusterPersistRepository());
MetaDataContexts metaDataContexts = renewMetaDataContexts(contextManager.getMetaDataContexts(), metaDataPersistService);
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
index 5c3b2533e0d..df17842de36 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobTest.java
@@ -62,7 +62,7 @@ public final class RuleAlteredJobTest {
@SneakyThrows(SQLException.class)
private void initTableData(final JobConfiguration jobConfig) {
YamlPipelineDataSourceConfiguration source = jobConfig.getPipelineConfig().getSource();
- try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceFactory().newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
+ try (PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(PipelineDataSourceConfigurationFactory.newInstance(source.getType(), source.getParameter()));
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");