You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/11 07:57:45 UTC
[shardingsphere] branch master updated: Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 486a1565a31 Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)
486a1565a31 is described below
commit 486a1565a31ce11ca09e0360621dbfb81d7c9d18
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Oct 11 15:57:36 2022 +0800
Revise show migration check status DistSQL impl; Clean unused dataConsistencyCheck methods (#21488)
* Revise show migration check status DistSQL impl
* Simplify aggregateDataConsistencyCheckResults impl
* Clean unused dataConsistencyCheck methods; Improve dataConsistencyCheck unit test
* Remove unused transient
* Recover wait time in IT
* Fix getJobProgressInfo
* Convert "NULL" to "" in check status
* Fix setRecordsCount too late
* Rename to emptyIfNull
---
.../ShowMigrationCheckStatusQueryResultSet.java | 13 +++++--
.../api/InventoryIncrementalJobPublicAPI.java | 20 ----------
.../consistency/DataConsistencyCheckResult.java | 9 +++++
.../api/pojo/ConsistencyCheckJobProgressInfo.java | 2 +-
.../core/api/InventoryIncrementalJobAPI.java | 2 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 22 +----------
...SingleTableInventoryDataConsistencyChecker.java | 25 +++++++------
.../ConsistencyCheckJobAPIImpl.java | 43 ++++++++++++----------
.../migration/MigrationDataConsistencyChecker.java | 7 +---
.../yaml/metadata/YamlPipelineColumnMetaData.java | 4 ++
.../data/pipeline/cases/base/BaseITCase.java | 2 +-
.../cases/migration/AbstractMigrationITCase.java | 2 +-
.../core/api/impl/MigrationJobAPIImplTest.java | 24 ++++--------
.../core/util/JobConfigurationBuilder.java | 6 ++-
14 files changed, 77 insertions(+), 104 deletions(-)
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index e893f660de8..5c4a3b0247b 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -47,15 +47,20 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
List<Collection<Object>> result = new LinkedList<>();
String checkResult = null == progressInfo.getResult() ? "" : progressInfo.getResult().toString();
- result.add(Arrays.asList(progressInfo.getTableName(), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
- ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(), ""), progressInfo.getCheckBeginTime(), ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
- ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(), ""), progressInfo.getErrorMessage()));
+ result.add(Arrays.asList(emptyIfNull(progressInfo.getTableNames()), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
+ emptyIfNull(progressInfo.getRemainingSeconds()),
+ emptyIfNull(progressInfo.getCheckBeginTime()), emptyIfNull(progressInfo.getCheckEndTime()),
+ emptyIfNull(progressInfo.getDurationSeconds()), emptyIfNull(progressInfo.getErrorMessage())));
data = result.iterator();
}
+ private Object emptyIfNull(final Object object) {
+ return ObjectUtils.defaultIfNull(object, "");
+ }
+
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
+ return Arrays.asList("tables", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
}
@Override
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 47b0051238c..20d50211524 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.api;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
-import java.util.Properties;
/**
* Inventory incremental job public API.
@@ -92,22 +90,4 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
* @return data consistency check algorithms
*/
Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm props. Nullable
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
index 00bf9120381..71b696fcded 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCheckResult.java
@@ -32,4 +32,13 @@ public final class DataConsistencyCheckResult {
private final DataConsistencyCountCheckResult countCheckResult;
private final DataConsistencyContentCheckResult contentCheckResult;
+
+ /**
+ * Is count and content matched.
+ *
+ * @return matched or not
+ */
+ public boolean isMatched() {
+ return countCheckResult.isMatched() && contentCheckResult.isMatched();
+ }
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index 59604a4e991..e59755a9ccc 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -26,7 +26,7 @@ import lombok.Data;
@Data
public final class ConsistencyCheckJobProgressInfo {
- private String tableName;
+ private String tableNames;
private Boolean result;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 668724a5d56..79ad6b18c56 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -57,7 +57,7 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
- * @param checkJobItemContext consistency check job progress listener
+ * @param checkJobItemContext consistency check job item context
* @return each logic table check result
*/
Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm,
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 19be5acdd2c..55e0e245383 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -155,23 +155,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
: DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps);
}
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
- checkModeConfig();
- log.info("Data consistency check for job {}", jobId);
- PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
- return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
- checkModeConfig();
- log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
- PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps), null);
- }
-
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm,
final ConsistencyCheckJobItemContext checkJobItemContext) {
@@ -193,10 +176,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}
for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
DataConsistencyCheckResult checkResult = entry.getValue();
- boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
- boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
- if (!isCountMatched || !isContentMatched) {
- log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+ if (!checkResult.isMatched()) {
return false;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3547a2a1f48..a9eaa29f2a2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -100,12 +100,20 @@ public final class SingleTableInventoryDataConsistencyChecker {
}
private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
- final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
+ final ConsistencyCheckJobItemContext checkJobItemContext) {
String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
String targetDatabaseType = targetDataSource.getDatabaseType().getType();
String sourceTableName = sourceTable.getTableName().getOriginal();
String schemaName = sourceTable.getSchemaName().getOriginal();
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
+ if (null != checkJobItemContext) {
+ checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+ InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+ long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ checkJobItemContext.setRecordsCount(recordsCount);
+ log.info("check, get records count: {}", recordsCount);
+ }
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
@@ -114,13 +122,6 @@ public final class SingleTableInventoryDataConsistencyChecker {
targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
- if (null != consistencyCheckJobItemContext) {
- consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
- InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
- long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
- consistencyCheckJobItemContext.setRecordsCount(recordsCount);
- }
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
@@ -139,12 +140,12 @@ public final class SingleTableInventoryDataConsistencyChecker {
log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
- if (null != consistencyCheckJobItemContext) {
- consistencyCheckJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ if (null != checkJobItemContext) {
+ checkJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
}
}
- if (null != consistencyCheckJobItemContext) {
- consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
+ if (null != checkJobItemContext) {
+ checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index c46efa359f1..60f9e0da017 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -36,11 +36,13 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgr
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
@@ -49,13 +51,12 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
/**
@@ -64,6 +65,8 @@ import java.util.Optional;
@Slf4j
public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl implements ConsistencyCheckJobAPI {
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
private final YamlConsistencyCheckJobProgressSwapper swapper = new YamlConsistencyCheckJobProgressSwapper();
@Override
@@ -175,33 +178,35 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
if (null == jobItemProgress) {
return result;
}
- int finishedPercentage;
LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
- if (null != jobItemProgress.getRecordsCount() && Objects.equals(jobItemProgress.getCheckedRecordsCount(), jobItemProgress.getRecordsCount())) {
- finishedPercentage = 100;
+ if (null == jobItemProgress.getRecordsCount()) {
+ result.setFinishedPercentage(0);
+ result.setResult(false);
+ return result;
+ }
+ long recordsCount = jobItemProgress.getRecordsCount();
+ if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
+ result.setFinishedPercentage(100);
LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
Duration duration = Duration.between(checkBeginTime, checkEndTime);
- result.setDurationSeconds(duration.toMillis() / 1000);
+ result.setDurationSeconds(duration.getSeconds());
result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
result.setRemainingSeconds(0L);
} else {
- if (null == jobItemProgress.getRecordsCount()) {
- finishedPercentage = 0;
- } else {
- finishedPercentage = Math.min(100, BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 100, jobItemProgress.getRecordsCount())).intValue());
- Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
- long remainMills = jobItemProgress.getRecordsCount() * 100 / jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
- result.setRemainingSeconds(remainMills / 1000);
- }
+ long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
+ result.setFinishedPercentage((int) (checkedRecordsCount * 100 / recordsCount));
+ Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
+ result.setDurationSeconds(duration.getSeconds());
+ long remainingMills = (recordsCount - checkedRecordsCount) / recordsCount * duration.toMillis();
+ result.setRemainingSeconds(remainingMills / 1000);
}
- result.setFinishedPercentage(finishedPercentage);
- String tableName = null == jobItemProgress.getTableNames() ? null : jobItemProgress.getTableNames().split(",")[0];
- result.setTableName(Optional.ofNullable(tableName).orElse(""));
+ String tableNames = jobItemProgress.getTableNames();
+ result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, checkJobId);
- Optional<DataConsistencyCheckResult> dataConsistencyCheckResult = Optional.ofNullable(checkJobResult.get(tableName));
- dataConsistencyCheckResult.ifPresent(optional -> result.setResult(optional.getContentCheckResult().isMatched()));
+ InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(parentJobId));
+ result.setResult(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
return result;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 91fa69b6fdc..f6b97a2a455 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMap
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -75,12 +74,10 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
SchemaTableName sourceTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())), new TableName(jobConfig.getSourceTableName()));
SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
- PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
- PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
- PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
+ PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
+ PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
index e6f88962b17..c4a96dc64e4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/metadata/YamlPipelineColumnMetaData.java
@@ -17,12 +17,16 @@
package org.apache.shardingsphere.data.pipeline.yaml.metadata;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
* Yaml pipeline column meta data.
*/
+@NoArgsConstructor
+@AllArgsConstructor
@Data
public final class YamlPipelineColumnMetaData implements YamlConfiguration {
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index f269624890e..328bbfc0b1d 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -286,7 +286,7 @@ public abstract class BaseITCase {
}
assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
- if (Collections.min(incrementalIdleSecondsList) < 15) {
+ if (Collections.min(incrementalIdleSecondsList) <= 5) {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
continue;
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index a04d7224de4..ac23115bf9b 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -172,7 +172,7 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
if (checkEndTimeList.size() == resultList.size()) {
break;
}
- ThreadUtil.sleep(5, TimeUnit.SECONDS);
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
log.info("check job results: {}", resultList);
for (Map<String, Object> entry : resultList) {
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 9cd96221ed8..452001a8b4a 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -27,17 +27,16 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
-import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.junit.AfterClass;
@@ -149,22 +148,13 @@ public final class MigrationJobAPIImplTest {
}
@Test
- public void assertDataConsistencyCheck() throws NoSuchFieldException, IllegalAccessException {
- MigrationJobConfiguration jobConfiguration = JobConfigurationBuilder.createJobConfiguration();
- ReflectionUtil.setFieldValue(jobConfiguration, "uniqueKeyColumn", new PipelineColumnMetaData(1, "order_id", 4, "", false, true, true));
- Optional<String> jobId = jobAPI.start(jobConfiguration);
- assertTrue(jobId.isPresent());
- initTableData(jobAPI.getJobConfiguration(jobId.get()));
- Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get());
- assertThat(checkResultMap.size(), is(1));
- }
-
- @Test
- public void assertDataConsistencyCheckWithAlgorithm() {
- Optional<String> jobId = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ public void assertDataConsistencyCheck() {
+ MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
+ initTableData(jobConfig);
+ Optional<String> jobId = jobAPI.start(jobConfig);
assertTrue(jobId.isPresent());
- initTableData(jobAPI.getJobConfiguration(jobId.get()));
- Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobId.get(), "FIXTURE", null);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
+ Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
assertThat(checkResultMap.size(), is(1));
assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
index 31a0d5ea6a4..72f4bb695d3 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -21,8 +21,6 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -31,6 +29,9 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.yaml.metadata.YamlPipelineColumnMetaData;
/**
* Job configuration builder.
@@ -53,6 +54,7 @@ public final class JobConfigurationBuilder {
result.setSource(createYamlPipelineDataSourceConfiguration(new StandardPipelineDataSourceConfiguration(ConfigurationFileUtil.readFile("migration_standard_jdbc_source.yaml"))));
result.setTarget(createYamlPipelineDataSourceConfiguration(new ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtil.readFile("migration_sharding_sphere_jdbc_target.yaml"))));
+ result.setUniqueKeyColumn(new YamlPipelineColumnMetaData(1, "order_id", 4, "", false, true, true));
PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION).extendYamlJobConfiguration(result);
return new YamlMigrationJobConfigurationSwapper().swapToObject(result);
}