You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/11 03:24:57 UTC
[shardingsphere] branch master updated: Refactor show migration check status DistSQL implementation (#21441)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c4b847a7a2c Refactor show migration check status DistSQL implementation (#21441)
c4b847a7a2c is described below
commit c4b847a7a2c737a1885ead254a5c9ae229f64136
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Oct 11 11:24:45 2022 +0800
Refactor show migration check status DistSQL implementation (#21441)
* Refactor show migration check status DistSQL implementation
* Fix ci
* Fix conflict
* Fix ci
* Fix codestyle
* Fix name
---
.../ShowMigrationCheckStatusQueryResultSet.java | 24 ++++-----
.../ShowMigrationCheckStatusStatement.java | 2 +-
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 9 ++++
.../job/progress/ConsistencyCheckJobProgress.java | 10 ++++
.../PipelineJobProgressUpdatedParameter.java | 4 +-
.../api/pojo/ConsistencyCheckJobProgressInfo.java} | 24 +++++++--
.../pipeline/core/api/GovernanceRepositoryAPI.java | 4 +-
.../core/api/InventoryIncrementalJobAPI.java | 5 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 14 +++--
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 4 +-
...SingleTableInventoryDataConsistencyChecker.java | 33 ++++++++++--
.../pipeline/core/importer/DefaultImporter.java | 2 +-
.../yaml/YamlConsistencyCheckJobProgress.java | 10 ++++
.../YamlConsistencyCheckJobProgressSwapper.java | 10 ++++
.../consistencycheck/ConsistencyCheckJob.java | 3 ++
.../ConsistencyCheckJobAPIImpl.java | 61 +++++++++++++++++++---
.../ConsistencyCheckJobItemContext.java | 25 ++++++++-
.../ConsistencyCheckTasksRunner.java | 3 +-
.../migration/MigrationDataConsistencyChecker.java | 11 ++--
.../scenario/migration/MigrationJobAPIImpl.java | 6 ++-
.../migration/MigrationJobItemContext.java | 3 +-
.../data/pipeline/cases/base/BaseITCase.java | 3 +-
.../cases/migration/AbstractMigrationITCase.java | 17 +++---
.../core/api/impl/MigrationJobAPIImplTest.java | 2 +-
.../MigrationDataConsistencyCheckerTest.java | 21 +++++++-
25 files changed, 245 insertions(+), 65 deletions(-)
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index 5c90cfbb7b1..e893f660de8 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -17,22 +17,20 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
/**
* Show migration check status query result set.
@@ -46,20 +44,18 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
- Map<String, DataConsistencyCheckResult> consistencyCheckResult = JOB_API.getLatestDataConsistencyCheckResult(checkMigrationStatement.getJobId());
- List<Collection<Object>> result = new ArrayList<>(consistencyCheckResult.size());
- for (Entry<String, DataConsistencyCheckResult> entry : consistencyCheckResult.entrySet()) {
- DataConsistencyCheckResult value = entry.getValue();
- DataConsistencyCountCheckResult countCheckResult = value.getCountCheckResult();
- result.add(Arrays.asList(entry.getKey(), countCheckResult.getSourceRecordsCount(), countCheckResult.getTargetRecordsCount(), String.valueOf(countCheckResult.isMatched()),
- String.valueOf(value.getContentCheckResult().isMatched())));
- }
+ ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
+ List<Collection<Object>> result = new LinkedList<>();
+ String checkResult = null == progressInfo.getResult() ? "" : progressInfo.getResult().toString();
+ result.add(Arrays.asList(progressInfo.getTableName(), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
+ ObjectUtils.defaultIfNull(progressInfo.getRemainingSeconds(), ""), progressInfo.getCheckBeginTime(), ObjectUtils.defaultIfNull(progressInfo.getCheckEndTime(), ""),
+ ObjectUtils.defaultIfNull(progressInfo.getDurationSeconds(), ""), progressInfo.getErrorMessage()));
data = result.iterator();
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("table_name", "source_records_count", "target_records_count", "records_count_matched", "records_content_matched");
+ return Arrays.asList("table_name", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
}
@Override
diff --git a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
index fe8a36c0e9d..e9b2e6932d5 100644
--- a/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
+++ b/features/sharding/distsql/statement/src/main/java/org/apache/shardingsphere/migration/distsql/statement/ShowMigrationCheckStatusStatement.java
@@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.distsql.parser.statement.ral.scaling.QueryableScalingRALStatement;
/**
- * Show check migration status statement.
+ * Show migration check status statement.
*/
@RequiredArgsConstructor
@Getter
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 008f0002705..2e59d9e3976 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -59,4 +60,12 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
* @param parentJobId parent job id
*/
void stopByParentJobId(String parentJobId);
+
+ /**
+ * Get consistency job progress info.
+ *
+ * @param parentJobId parent job id
+ * @return consistency job progress info
+ */
+ ConsistencyCheckJobProgressInfo getJobProgressInfo(String parentJobId);
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index 22a16efa840..e3297179140 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -31,4 +31,14 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
private JobStatus status = JobStatus.RUNNING;
+
+ private String tableNames;
+
+ private Long checkedRecordsCount;
+
+ private Long recordsCount;
+
+ private Long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
index 1a21a14bb7e..ab195bc42e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
@@ -27,7 +27,5 @@ import lombok.RequiredArgsConstructor;
@Getter
public final class PipelineJobProgressUpdatedParameter {
- private final int insertedRecordsCount;
-
- private final int deletedRecordsCount;
+ private final int processedRecordsCount;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
similarity index 65%
copy from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
index f0baaf647c8..59604a4e991 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
@@ -15,16 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
+package org.apache.shardingsphere.data.pipeline.api.pojo;
import lombok.Data;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data consistency check job progress.
+ * Consistency check jon progress info.
*/
+
@Data
-public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+public final class ConsistencyCheckJobProgressInfo {
+
+ private String tableName;
+
+ private Boolean result;
+
+ private int finishedPercentage;
+
+ private Long remainingSeconds;
+
+ private String checkBeginTime;
+
+ private String checkEndTime;
+
+ private Long durationSeconds;
- private String status;
+ private String errorMessage;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index e7dc53f1050..17874b2e8bb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -76,11 +76,11 @@ public interface GovernanceRepositoryAPI {
/**
* Get check job result.
*
- * @param jobId job id
+ * @param parentJobId job id
* @param checkJobId check job id
* @return check job result
*/
- Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
+ Map<String, DataConsistencyCheckResult> getCheckJobResult(String parentJobId, String checkJobId);
/**
* Persist check job result.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 077d08c7281..668724a5d56 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
@@ -56,9 +57,11 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
*
* @param pipelineJobConfig job configuration
* @param calculateAlgorithm calculate algorithm
+ * @param checkJobItemContext consistency check job progress listener
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm,
+ ConsistencyCheckJobItemContext checkJobItemContext);
/**
* Aggregate data consistency check results.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 2881877cbe6..19be5acdd2c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProce
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
@@ -160,7 +161,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
log.info("Data consistency check for job {}", jobId);
PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
- return dataConsistencyCheck(jobConfig, calculateAlgorithm);
+ return dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
}
@Override
@@ -168,18 +169,21 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps));
+ return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps), null);
}
@Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm,
+ final ConsistencyCheckJobItemContext checkJobItemContext) {
String jobId = jobConfig.getJobId();
- Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+ PipelineDataConsistencyChecker dataConsistencyChecker = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig), checkJobItemContext);
+ Map<String, DataConsistencyCheckResult> result = dataConsistencyChecker.check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
return result;
}
- protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext);
+ protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext,
+ ConsistencyCheckJobItemContext checkJobItemContext);
@Override
public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 86ce2c90139..64a876cc8f9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -77,9 +77,9 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@SuppressWarnings("unchecked")
@Override
- public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
+ public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String parentJobId, final String checkJobId) {
Map<String, DataConsistencyCheckResult> result = new HashMap<>();
- String yamlCheckResultMapText = repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+ String yamlCheckResultMapText = repository.getDirectly(PipelineMetaDataNode.getCheckJobResultPath(parentJobId, checkJobId));
if (StringUtils.isBlank(yamlCheckResultMapText)) {
return Collections.emptyMap();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index 3125fc778fc..3547a2a1f48 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -26,12 +28,16 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -40,7 +46,9 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
import java.sql.SQLException;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
@@ -72,6 +80,8 @@ public final class SingleTableInventoryDataConsistencyChecker {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+ private final ConsistencyCheckJobItemContext consistencyCheckJobItemContext;
+
/**
* Data consistency check.
*
@@ -82,26 +92,35 @@ public final class SingleTableInventoryDataConsistencyChecker {
ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
- return check(calculateAlgorithm, executor);
+ return check(calculateAlgorithm, executor, consistencyCheckJobItemContext);
} finally {
executor.shutdown();
executor.shutdownNow();
}
}
- private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor) {
+ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
+ final ConsistencyCheckJobItemContext consistencyCheckJobItemContext) {
String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
String targetDatabaseType = targetDataSource.getDatabaseType().getType();
String sourceTableName = sourceTable.getTableName().getOriginal();
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), sourceTableName);
+ String schemaName = sourceTable.getSchemaName().getOriginal();
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
Collection<String> columnNames = tableMetaData.getColumnNames();
DataConsistencyCalculateParameter sourceParameter = buildParameter(
- sourceDataSource, sourceTable.getSchemaName().getOriginal(), sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+ sourceDataSource, schemaName, sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
DataConsistencyCalculateParameter targetParameter = buildParameter(
targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
+ if (null != consistencyCheckJobItemContext) {
+ consistencyCheckJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
+ InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+ long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ consistencyCheckJobItemContext.setRecordsCount(recordsCount);
+ }
long sourceRecordsCount = 0;
long targetRecordsCount = 0;
boolean contentMatched = true;
@@ -120,6 +139,12 @@ public final class SingleTableInventoryDataConsistencyChecker {
log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
break;
}
+ if (null != consistencyCheckJobItemContext) {
+ consistencyCheckJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+ }
+ }
+ if (null != consistencyCheckJobItemContext) {
+ consistencyCheckJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 4b87e973cf9..699fc6d1878 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -125,7 +125,7 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
flushInternal(dataSource, each.getInsertDataRecords());
flushInternal(dataSource, each.getUpdateDataRecords());
}
- return new PipelineJobProgressUpdatedParameter(insertRecordNumber, deleteRecordNumber);
+ return new PipelineJobProgressUpdatedParameter(insertRecordNumber - deleteRecordNumber);
}
private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index f0baaf647c8..d915fce6a1a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -27,4 +27,14 @@ import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
private String status;
+
+ private String tableNames;
+
+ private Long checkedRecordsCount;
+
+ private Long recordsCount;
+
+ private Long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
index 8796af3c6d8..a3f5dcf20b1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
@@ -30,6 +30,11 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
result.setStatus(data.getStatus().name());
+ result.setRecordsCount(data.getRecordsCount());
+ result.setCheckedRecordsCount(data.getCheckedRecordsCount());
+ result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
+ result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
+ result.setTableNames(data.getTableNames());
return result;
}
@@ -37,6 +42,11 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
+ result.setRecordsCount(yamlConfig.getRecordsCount());
+ result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
+ result.setCheckBeginTimeMillis(yamlConfig.getCheckBeginTimeMillis());
+ result.setCheckEndTimeMillis(yamlConfig.getCheckEndTimeMillis());
+ result.setTableNames(yamlConfig.getTableNames());
return result;
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index b86955e252d..cce3ea67e4c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
@@ -55,6 +56,7 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
}
ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
tasksRunner.start();
+ PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, shardingContext.getShardingItem());
getTasksRunnerMap().put(shardingItem, tasksRunner);
}
@@ -74,5 +76,6 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
getTasksRunnerMap().clear();
String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
+ PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index b91daee9f9e..c46efa359f1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -23,8 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
@@ -34,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -44,12 +43,19 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -103,8 +109,14 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+ ConsistencyCheckJobItemContext checkJobItemContext = (ConsistencyCheckJobItemContext) jobItemContext;
ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
jobProgress.setStatus(jobItemContext.getStatus());
+ jobProgress.setCheckedRecordsCount(checkJobItemContext.getCheckedRecordsCount().get());
+ jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
+ jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
+ jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
+ jobProgress.setTableNames(null == checkJobItemContext.getTableNames() ? null : String.join(",", checkJobItemContext.getTableNames()));
YamlConsistencyCheckJobProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobProgress);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
}
@@ -115,10 +127,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
if (StringUtils.isBlank(progress)) {
return null;
}
- ConsistencyCheckJobProgress jobProgress = swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
- ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
- result.setStatus(jobProgress.getStatus());
- return result;
+ return swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
}
@Override
@@ -156,6 +165,46 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
stop(checkLatestJobId.get());
}
+ @Override
+ public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+ Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
+ ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new PipelineJobNotFoundException(parentJobId));
+ String checkJobId = checkLatestJobId.get();
+ ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+ ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+ if (null == jobItemProgress) {
+ return result;
+ }
+ int finishedPercentage;
+ LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
+ if (null != jobItemProgress.getRecordsCount() && Objects.equals(jobItemProgress.getCheckedRecordsCount(), jobItemProgress.getRecordsCount())) {
+ finishedPercentage = 100;
+ LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
+ Duration duration = Duration.between(checkBeginTime, checkEndTime);
+ result.setDurationSeconds(duration.toMillis() / 1000);
+ result.setCheckEndTime(DATE_TIME_FORMATTER.format(checkEndTime));
+ result.setRemainingSeconds(0L);
+ } else {
+ if (null == jobItemProgress.getRecordsCount()) {
+ finishedPercentage = 0;
+ } else {
+ finishedPercentage = Math.min(100, BigDecimal.valueOf(Math.floorDiv(jobItemProgress.getCheckedRecordsCount() * 100, jobItemProgress.getRecordsCount())).intValue());
+ Duration duration = Duration.between(checkBeginTime, LocalDateTime.now());
+ long remainMills = jobItemProgress.getRecordsCount() * 100 / jobItemProgress.getCheckedRecordsCount() * duration.toMillis();
+ result.setRemainingSeconds(remainMills / 1000);
+ }
+ }
+ result.setFinishedPercentage(finishedPercentage);
+ String tableName = null == jobItemProgress.getTableNames() ? null : jobItemProgress.getTableNames().split(",")[0];
+ result.setTableName(Optional.ofNullable(tableName).orElse(""));
+ result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
+ result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
+ Map<String, DataConsistencyCheckResult> checkJobResult = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckJobResult(parentJobId, checkJobId);
+ Optional<DataConsistencyCheckResult> dataConsistencyCheckResult = Optional.ofNullable(checkJobResult.get(tableName));
+ dataConsistencyCheckResult.ifPresent(optional -> result.setResult(optional.getContentCheckResult().isMatched()));
+ return result;
+ }
+
@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index fb5841d9fde..7b812529dd6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -24,6 +24,12 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJo
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Consistency check job item context.
@@ -31,7 +37,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@Getter
@Setter
@Slf4j
-public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext {
+public final class ConsistencyCheckJobItemContext implements PipelineJobItemContext, PipelineJobProgressListener {
private final String jobId;
@@ -43,6 +49,16 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
private volatile JobStatus status;
+ private Collection<String> tableNames;
+
+ private volatile Long recordsCount;
+
+ private final AtomicLong checkedRecordsCount = new AtomicLong(0);
+
+ private final long checkBeginTimeMillis;
+
+ private Long checkEndTimeMillis;
+
private final ConsistencyCheckJobConfiguration jobConfig;
public ConsistencyCheckJobItemContext(final ConsistencyCheckJobConfiguration jobConfig, final int shardingItem, final JobStatus status) {
@@ -50,10 +66,17 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
this.status = status;
+ checkBeginTimeMillis = System.currentTimeMillis();
}
@Override
public PipelineProcessContext getJobProcessContext() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+ checkedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
+ PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
index 69055b5a85d..a13093312a4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -99,8 +99,9 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
this.calculateAlgorithm = calculateAlgorithm;
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm, jobItemContext);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+ jobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 50ab3a39afb..91fa69b6fdc 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -57,11 +58,15 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
+ private final ConsistencyCheckJobItemContext checkJobItemContext;
+
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext,
+ final ConsistencyCheckJobItemContext checkJobItemContext) {
this.jobConfig = jobConfig;
readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
+ this.checkJobItemContext = checkJobItemContext;
}
@Override
@@ -77,8 +82,8 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
- SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
- jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm);
+ SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
+ sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index d73e6c07c47..72fe7f49559 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -63,6 +63,7 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTabl
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -241,8 +242,9 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
}
@Override
- protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext);
+ protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext,
+ final ConsistencyCheckJobItemContext checkJobItemContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, checkJobItemContext);
}
@Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index da3eb7b8203..35651826bd2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -132,8 +132,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
- int needAddNumber = parameter.getInsertedRecordsCount() - parameter.getDeletedRecordsCount();
- processedRecordsCount.addAndGet(needAddNumber);
+ processedRecordsCount.addAndGet(parameter.getProcessedRecordsCount());
PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 644917216ba..f269624890e 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -238,7 +239,7 @@ public abstract class BaseITCase {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
List<Map<String, Object>> result = resultSetToList(resultSet);
log.info("proxy query for list, sql: {}, result: {}", sql, result);
- return result;
+ return ObjectUtils.defaultIfNull(result, Collections.emptyList());
} catch (final SQLException ex) {
log.error("data access error", ex);
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 5032f649681..a04d7224de4 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
@@ -164,19 +165,19 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) throws SQLException {
proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
- List<Map<String, Object>> checkJobResults = Collections.emptyList();
+ List<Map<String, Object>> resultList = Collections.emptyList();
for (int i = 0; i < 10; i++) {
- checkJobResults = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
- if (null != checkJobResults && !checkJobResults.isEmpty()) {
+ resultList = queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
+ List<String> checkEndTimeList = resultList.stream().map(map -> map.get("check_end_time").toString()).filter(StringUtils::isNotBlank).collect(Collectors.toList());
+ if (checkEndTimeList.size() == resultList.size()) {
break;
}
ThreadUtil.sleep(5, TimeUnit.SECONDS);
}
- assertTrue(null != checkJobResults && !checkJobResults.isEmpty());
- log.info("check job results: {}", checkJobResults);
- for (Map<String, Object> entry : checkJobResults) {
- assertTrue(Boolean.parseBoolean(entry.get("records_count_matched").toString()));
- assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
+ log.info("check job results: {}", resultList);
+ for (Map<String, Object> entry : resultList) {
+ assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
+ assertThat(entry.get("finished_percentage").toString(), is("100"));
}
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index f8dd4794b7b..9cd96221ed8 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index fa646106a05..1b008f63e67 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -18,12 +18,19 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -31,6 +38,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
+import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -46,13 +54,22 @@ public final class MigrationDataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
MigrationJobConfiguration jobConfig = createJobConfiguration();
- Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null))
- .check(new DataConsistencyCalculateAlgorithmFixture());
+ JobConfigurationPOJO jobConfigurationPOJO = new JobConfigurationPOJO();
+ jobConfigurationPOJO.setJobParameter(YamlEngine.marshal(new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persist(String.format("/pipeline/jobs/%s/config", jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobConfig.getJobId(), 0, "");
+ Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null),
+ createConsistencyCheckJobItemConfig()).check(new DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());
}
+ private ConsistencyCheckJobItemContext createConsistencyCheckJobItemConfig() {
+ ConsistencyCheckJobConfiguration jobConfig = new ConsistencyCheckJobConfiguration("", "", "", new Properties());
+ return new ConsistencyCheckJobItemContext(jobConfig, 0, JobStatus.RUNNING);
+ }
+
private MigrationJobConfiguration createJobConfiguration() throws SQLException {
MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration());
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());