You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/30 03:28:35 UTC
[shardingsphere] branch master updated: Revise consistency check job code; Migration job drop cascade check jobs (#21273)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc2d9b63428 Revise consistency check job code; Migration job drop cascade check jobs (#21273)
bc2d9b63428 is described below
commit bc2d9b634288205b7602fbe00d66161c8a585ffe
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 30 11:28:25 2022 +0800
Revise consistency check job code; Migration job drop cascade check jobs (#21273)
* Add log
* Revise consistency check job code
* Revise consistency check job code
* Migration job drop cascade check jobs
* Update sequence range
* Unit test
---
.../pipeline/api/ConsistencyCheckJobPublicAPI.java | 2 +-
.../job/ConsistencyCheckJobConfiguration.java | 2 +-
.../yaml/YamlConsistencyCheckJobConfiguration.java | 2 +-
...amlConsistencyCheckJobConfigurationSwapper.java | 4 +--
.../job/progress/ConsistencyCheckJobProgress.java | 4 ++-
.../dumper/IncrementalDumperCreatorFactory.java | 14 +++++++-
.../pipeline/core/api/GovernanceRepositoryAPI.java | 16 ++++++---
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 32 ++++++++++-------
...mpletedConsistencyCheckJobExistsException.java} | 8 ++---
.../yaml/YamlConsistencyCheckJobProgress.java | 2 +-
.../metadata/generator/PipelineDDLGenerator.java | 4 ++-
.../core/metadata/node/PipelineMetaDataNode.java | 4 +--
.../core/prepare/InventoryTaskSplitter.java | 2 ++
.../core/prepare/PipelineJobPreparerUtils.java | 24 ++++++++-----
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
...tencyCheckChangedJobConfigurationProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 3 +-
.../consistencycheck/ConsistencyCheckJobAPI.java | 1 -
.../ConsistencyCheckJobAPIImpl.java | 41 ++++++++++++----------
.../consistencycheck/ConsistencyCheckJobId.java | 20 ++++++-----
.../ConsistencyCheckJobItemContext.java | 2 +-
.../pipeline/scenario/migration/MigrationJob.java | 2 ++
.../scenario/migration/MigrationJobAPIImpl.java | 19 ++++++++++
.../scenario/migration/MigrationJobPreparer.java | 11 +++---
.../api/impl/ConsistencyCheckJobAPIImplTest.java | 15 ++++----
25 files changed, 152 insertions(+), 86 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 5e226ce1124..008f0002705 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -31,7 +31,7 @@ import java.util.Map;
public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, RequiredSPI {
/**
- * Create job migration config and start.
+ * Create consistency check configuration and start job.
*
* @param parameter create consistency check job parameter
* @return job id
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
index 8576a374cae..b80eab65ee1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/ConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig
private final String algorithmTypeName;
- private final Properties algorithmProperties;
+ private final Properties algorithmProps;
@Override
public String getSourceDatabaseType() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
index 53ec03e9c42..69ace660ef1 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfiguration.java
@@ -39,7 +39,7 @@ public final class YamlConsistencyCheckJobConfiguration implements YamlPipelineJ
private String algorithmTypeName;
- private Properties algorithmProperties;
+ private Properties algorithmProps;
@Override
public String getTargetDatabaseName() {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
index 64ce12e1029..f945d8d0804 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/job/yaml/YamlConsistencyCheckJobConfigurationSwapper.java
@@ -34,13 +34,13 @@ public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlCo
result.setJobId(data.getJobId());
result.setParentJobId(data.getParentJobId());
result.setAlgorithmTypeName(data.getAlgorithmTypeName());
- result.setAlgorithmProperties(data.getAlgorithmProperties());
+ result.setAlgorithmProps(data.getAlgorithmProps());
return result;
}
@Override
public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJobConfiguration yamlConfig) {
- return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProperties());
+ return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProps());
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
index dbb318489b4..22a16efa840 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
@@ -19,13 +19,15 @@ package org.apache.shardingsphere.data.pipeline.api.job.progress;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
/**
- * Data check job item progress.
+ * Data consistency check job progress.
*/
@Getter
@Setter
+@ToString
public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
private JobStatus status = JobStatus.RUNNING;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index f81e9fbcd61..1f39e23fd02 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -22,6 +22,8 @@ import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+import java.util.Optional;
+
/**
* Incremental dumper creator factory.
*/
@@ -33,7 +35,7 @@ public class IncrementalDumperCreatorFactory {
}
/**
- * Incremental dumper creator.
+ * Get incremental dumper creator.
*
* @param databaseType database type
* @return incremental dumper creator
@@ -41,4 +43,14 @@ public class IncrementalDumperCreatorFactory {
public static IncrementalDumperCreator getInstance(final String databaseType) {
return TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, databaseType);
}
+
+ /**
+ * Find incremental dumper creator.
+ *
+ * @param databaseType database type
+ * @return incremental dumper creator optional
+ */
+ public static Optional<IncrementalDumperCreator> findInstance(final String databaseType) {
+ return TypedSPIRegistry.findRegisteredService(IncrementalDumperCreator.class, databaseType);
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index edbc645a141..e7dc53f1050 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -66,7 +66,7 @@ public interface GovernanceRepositoryAPI {
Optional<String> getCheckLatestJobId(String jobId);
/**
- * Persist check latest result.
+ * Persist check latest job id.
*
* @param jobId job id
* @param checkJobId check job id
@@ -83,13 +83,21 @@ public interface GovernanceRepositoryAPI {
Map<String, DataConsistencyCheckResult> getCheckJobResult(String jobId, String checkJobId);
/**
- * Persist check latest detailed result.
+ * Persist check job result.
*
* @param jobId job id
* @param checkJobId check job id
- * @param dataConsistencyCheckResult check result
+ * @param checkResultMap check result map
*/
- void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult);
+ void persistCheckJobResult(String jobId, String checkJobId, Map<String, DataConsistencyCheckResult> checkResultMap);
+
+ /**
+ * Delete check job result.
+ *
+ * @param jobId job id
+ * @param checkJobId check job id
+ */
+ void deleteCheckJobResult(String jobId, String checkJobId);
/**
* List check job ids.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 3baf3db9f5f..329915e7485 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -79,30 +79,36 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
@Override
public Map<String, DataConsistencyCheckResult> getCheckJobResult(final String jobId, final String checkJobId) {
Map<String, DataConsistencyCheckResult> result = new HashMap<>();
- String checkJobText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
- if (StringUtils.isBlank(checkJobText)) {
+ String yamlCheckResultMapText = repository.get(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
+ if (StringUtils.isBlank(yamlCheckResultMapText)) {
return Collections.emptyMap();
}
- Map<String, String> checkJobConfigMap = YamlEngine.unmarshal(checkJobText, Map.class, true);
- for (Entry<String, String> entry : checkJobConfigMap.entrySet()) {
+ Map<String, String> yamlCheckResultMap = YamlEngine.unmarshal(yamlCheckResultMapText, Map.class, true);
+ for (Entry<String, String> entry : yamlCheckResultMap.entrySet()) {
result.put(entry.getKey(), YamlDataConsistencyCheckResultSwapper.swapToObject(entry.getValue()));
}
return result;
}
@Override
- public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult) {
- if (null == dataConsistencyCheckResult) {
- log.warn("data consistency check is null, jobId {}, checkJobId {}", jobId, checkJobId);
+ public void persistCheckJobResult(final String jobId, final String checkJobId, final Map<String, DataConsistencyCheckResult> checkResultMap) {
+ if (null == checkResultMap) {
+ log.warn("checkResultMap is null, jobId {}, checkJobId {}", jobId, checkJobId);
return;
}
- log.info("persist check job result '{}' for job {}", checkJobId, jobId);
- Map<String, String> checkResultMap = new LinkedHashMap<>();
- for (Entry<String, DataConsistencyCheckResult> entry : dataConsistencyCheckResult.entrySet()) {
- YamlDataConsistencyCheckResult checkResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
- checkResultMap.put(entry.getKey(), YamlEngine.marshal(checkResult));
+ log.info("persist check job result for job {}", checkJobId);
+ Map<String, String> yamlCheckResultMap = new LinkedHashMap<>();
+ for (Entry<String, DataConsistencyCheckResult> entry : checkResultMap.entrySet()) {
+ YamlDataConsistencyCheckResult yamlCheckResult = new YamlDataConsistencyCheckResultSwapper().swapToYamlConfiguration(entry.getValue());
+ yamlCheckResultMap.put(entry.getKey(), YamlEngine.marshal(yamlCheckResult));
}
- repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(checkResultMap));
+ repository.persist(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId), YamlEngine.marshal(yamlCheckResultMap));
+ }
+
+ @Override
+ public void deleteCheckJobResult(final String jobId, final String checkJobId) {
+ log.info("deleteCheckJobResult, jobId={}, checkJobId={}", jobId, checkJobId);
+ repository.delete(PipelineMetaDataNode.getCheckJobResultPath(jobId, checkJobId));
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
similarity index 77%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
index dff0e6a1905..e5ab9d1c8db 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyExistedException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Pipeline job has already existed exception.
+ * Uncompleted consistency check job exists exception.
*/
-public final class PipelineJobHasAlreadyExistedException extends PipelineSQLException {
+public final class UncompletedConsistencyCheckJobExistsException extends PipelineSQLException {
private static final long serialVersionUID = 2854259384634892428L;
- public PipelineJobHasAlreadyExistedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 86, "Job `%s` has already existed", jobId);
+ public UncompletedConsistencyCheckJobExistsException(final String jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 86, "Uncompelted consistency check job `%s` exists", jobId);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
index 05dd63bb70c..f0baaf647c8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
@@ -21,7 +21,7 @@ import lombok.Data;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data check job progress.
+ * Yaml data consistency check job progress.
*/
@Data
public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
index 1df7d29a532..a2785126509 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/generator/PipelineDDLGenerator.java
@@ -72,12 +72,14 @@ public final class PipelineDDLGenerator {
*/
public String generateLogicDDL(final DatabaseType databaseType, final DataSource sourceDataSource,
final String schemaName, final String sourceTableName, final String targetTableName, final ShardingSphereSQLParserEngine parserEngine) throws SQLException {
- log.info("generateLogicDDLSQL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
+ log.info("generateLogicDDL, databaseType={}, schemaName={}, sourceTableName={}, targetTableName={}", databaseType.getType(), schemaName, sourceTableName, targetTableName);
+ long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
for (String each : CreateTableSQLGeneratorFactory.getInstance(databaseType).generate(sourceDataSource, schemaName, sourceTableName)) {
Optional<String> queryContext = decorate(databaseType, sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(ddlSQL -> result.append(ddlSQL).append(DELIMITER).append(System.lineSeparator()));
}
+ log.info("generateLogicDDL cost {} ms", System.currentTimeMillis() - startTimeMillis);
return result.toString();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 81a35c9f7e5..190bf24e353 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -118,7 +118,7 @@ public final class PipelineMetaDataNode {
}
/**
- * Get check latest detailed result path.
+ * Get check latest job id path.
*
* @param jobId job id
* @return check latest job id path
@@ -128,7 +128,7 @@ public final class PipelineMetaDataNode {
}
/**
- * Get check latest result path.
+ * Get check job result path.
*
* @param jobId job id
* @param checkJobId check job id
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index fd8354ab791..f14e56d3e71 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -73,11 +73,13 @@ public final class InventoryTaskSplitter {
*/
public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
List<InventoryTask> result = new LinkedList<>();
+ long startTimeMillis = System.currentTimeMillis();
PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, jobItemContext.getDataSourceManager(), sourceDataSource, jobItemContext.getSourceMetaDataLoader(),
jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getInventoryImporterExecuteEngine(), jobItemContext));
}
+ log.info("splitInventoryData cost {} ms", System.currentTimeMillis() - startTimeMillis);
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index d1ea79d9981..c58c67eaad7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourc
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
+import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
import org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -47,11 +48,8 @@ import org.apache.shardingsphere.parser.rule.SQLParserRule;
import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Optional;
-import java.util.Set;
/**
* Pipeline job preparer utils.
@@ -59,8 +57,6 @@ import java.util.Set;
@Slf4j
public final class PipelineJobPreparerUtils {
- private static final Set<String> INCREMENTAL_SUPPORTED_DATABASES = new HashSet<>(Arrays.asList("MySQL", "PostgreSQL", "openGauss"));
-
/**
* Is incremental supported.
*
@@ -68,8 +64,7 @@ public final class PipelineJobPreparerUtils {
* @return true if supported, otherwise false
*/
public static boolean isIncrementalSupported(final String databaseType) {
- // TODO check by IncrementalDumperCreator SPI
- return INCREMENTAL_SUPPORTED_DATABASES.contains(databaseType);
+ return IncrementalDumperCreatorFactory.findInstance(databaseType).isPresent();
}
/**
@@ -85,7 +80,9 @@ public final class PipelineJobPreparerUtils {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourcePreparer.get().prepareTargetSchemas(prepareTargetSchemasParameter);
+ log.info("prepareTargetSchema cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -113,7 +110,9 @@ public final class PipelineJobPreparerUtils {
log.info("dataSourcePreparer null, ignore prepare target");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourcePreparer.get().prepareTargetTables(prepareTargetTablesParameter);
+ log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -135,7 +134,10 @@ public final class PipelineJobPreparerUtils {
}
String databaseType = dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- return PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
+ long startTimeMillis = System.currentTimeMillis();
+ IngestPosition<?> result = PositionInitializerFactory.getInstance(databaseType).init(dataSource, dumperConfig.getJobId());
+ log.info("getIncrementalPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ return result;
}
/**
@@ -149,10 +151,12 @@ public final class PipelineJobPreparerUtils {
log.info("source data source is empty, skip check");
return;
}
+ final long startTimeMillis = System.currentTimeMillis();
DataSourceChecker dataSourceChecker = DataSourceCheckerFactory.getInstance(databaseType);
dataSourceChecker.checkConnection(dataSources);
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
+ log.info("checkSourceDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -168,8 +172,10 @@ public final class PipelineJobPreparerUtils {
log.info("target data source is empty, skip check");
return;
}
+ long startTimeMillis = System.currentTimeMillis();
dataSourceChecker.checkConnection(targetDataSources);
dataSourceChecker.checkTargetTable(targetDataSources, importerConfig.getTableNameSchemaNameMapping(), importerConfig.getLogicTableNames());
+ log.info("checkTargetDataSource cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
/**
@@ -182,6 +188,7 @@ public final class PipelineJobPreparerUtils {
public static void destroyPosition(final String jobId, final PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
PositionInitializer positionInitializer = PositionInitializerFactory.getInstance(databaseType.getType());
+ final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof ShardingSpherePipelineDataSourceConfiguration) {
ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
@@ -199,5 +206,6 @@ public final class PipelineJobPreparerUtils {
positionInitializer.destroy(dataSource, jobId);
}
}
+ log.info("destroyPosition cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 03889561674..9e7363970a0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -46,7 +46,7 @@ import java.util.concurrent.CompletableFuture;
* Incremental task.
*/
@Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "importerExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
+@ToString(exclude = {"incrementalExecuteEngine", "channel", "dumper", "importers", "taskProgress"})
public final class IncrementalTask implements PipelineTask, AutoCloseable {
@Getter
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index ea606b8e296..8d71eae5a7c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEve
import java.util.concurrent.CompletableFuture;
/**
- * Consistency check job configuration changed processor.
+ * Consistency check changed job configuration processor.
*/
@Slf4j
public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 05dc69d7467..e00a4e6359c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -59,14 +59,13 @@ public final class ConsistencyCheckJob extends AbstractPipelineJob implements Si
jobAPI.persistJobItemProgress(jobItemContext);
String parentJobId = consistencyCheckJobConfig.getParentJobId();
log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(parentJobId, checkJobId);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
try {
dataConsistencyCheckResult = StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
? jobPublicAPI.dataConsistencyCheck(parentJobId)
- : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProperties());
+ : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProps());
status = JobStatus.FINISHED;
} catch (final SQLWrapperException ex) {
log.error("data consistency check failed", ex);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
index dca95957b5c..d8495582568 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPI.java
@@ -25,5 +25,4 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
* Consistency check job API.
*/
public interface ConsistencyCheckJobAPI extends ConsistencyCheckJobPublicAPI, PipelineJobAPI, PipelineJobItemAPI {
-
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 4ad73f6ee91..c2e88634d30 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -39,9 +39,9 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyExistedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyFinishedException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -63,28 +63,31 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
ConsistencyCheckJobId jobId = (ConsistencyCheckJobId) pipelineJobId;
- return jobId.getPipelineJobId() + jobId.getSequence();
+ return jobId.getParentJobId() + jobId.getSequence();
}
@Override
public String createJobAndStart(final CreateConsistencyCheckJobParameter parameter) {
GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
- Optional<String> optional = repositoryAPI.getCheckLatestJobId(parameter.getJobId());
- if (optional.isPresent()) {
- PipelineJobItemProgress progress = getJobItemProgress(optional.get(), 0);
- if (null != progress && JobStatus.FINISHED != progress.getStatus()) {
- log.info("check job already existed and status isn't FINISHED, status {}", progress.getStatus());
- throw new PipelineJobHasAlreadyExistedException(optional.get());
+ String parentJobId = parameter.getJobId();
+ Optional<String> checkLatestJobId = repositoryAPI.getCheckLatestJobId(parentJobId);
+ if (checkLatestJobId.isPresent()) {
+ PipelineJobItemProgress progress = getJobItemProgress(checkLatestJobId.get(), 0);
+ if (null == progress || JobStatus.FINISHED != progress.getStatus()) {
+ log.info("check job already exists and status is not FINISHED, progress={}", progress);
+ throw new UncompletedConsistencyCheckJobExistsException(checkLatestJobId.get());
}
}
- int consistencyCheckVersionNew = optional.map(s -> ConsistencyCheckJobId.getSequence(s) + 1).orElse(0);
+ int sequence = checkLatestJobId.map(s -> ConsistencyCheckJobId.parseSequence(s) + 1).orElse(ConsistencyCheckJobId.MIN_SEQUENCE);
+ String result = marshalJobId(new ConsistencyCheckJobId(parentJobId, sequence));
+ repositoryAPI.persistCheckLatestJobId(parentJobId, result);
+ repositoryAPI.deleteCheckJobResult(parentJobId, result);
+ dropJob(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
- ConsistencyCheckJobId checkJobId = new ConsistencyCheckJobId(parameter.getJobId(), consistencyCheckVersionNew);
- String result = marshalJobId(checkJobId);
yamlConfig.setJobId(result);
- yamlConfig.setParentJobId(parameter.getJobId());
+ yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(parameter.getAlgorithmTypeName());
- yamlConfig.setAlgorithmProperties(parameter.getAlgorithmProps());
+ yamlConfig.setAlgorithmProps(parameter.getAlgorithmProps());
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig);
start(jobConfig);
return result;
@@ -108,7 +111,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
}
@Override
- public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ public ConsistencyCheckJobProgress getJobItemProgress(final String jobId, final int shardingItem) {
String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
if (StringUtils.isBlank(progress)) {
return null;
@@ -121,7 +124,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- ConsistencyCheckJobProgress jobProgress = (ConsistencyCheckJobProgress) getJobItemProgress(jobId, shardingItem);
+ ConsistencyCheckJobProgress jobProgress = getJobItemProgress(jobId, shardingItem);
if (null == jobProgress) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
@@ -135,6 +138,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
log.info("Start disable check job {}", jobId);
PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
ShardingSpherePreconditions.checkState(null == jobProgress || JobStatus.FINISHED != jobProgress.getStatus(), () -> new PipelineJobHasAlreadyFinishedException(jobId));
+ super.startDisabledJob(jobId);
}
@Override
@@ -170,21 +174,22 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
@Override
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+ throw new UnsupportedOperationException();
}
@Override
public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
index 9e8d3b2b87b..727d83dedb6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobId.java
@@ -32,26 +32,28 @@ public final class ConsistencyCheckJobId extends AbstractPipelineJobId {
public static final String CURRENT_VERSION = "01";
- private static final int MAX_CONSISTENCY_CHECK_VERSION = 9;
+ public static final int MIN_SEQUENCE = 1;
- private final String pipelineJobId;
+ private static final int MAX_SEQUENCE = 3;
+
+ private final String parentJobId;
private final int sequence;
- public ConsistencyCheckJobId(final @NonNull String pipelineJobId, final int sequence) {
+ public ConsistencyCheckJobId(final @NonNull String parentJobId, final int sequence) {
super(JobType.CONSISTENCY_CHECK, CURRENT_VERSION);
- this.pipelineJobId = pipelineJobId;
- this.sequence = sequence > MAX_CONSISTENCY_CHECK_VERSION ? 0 : sequence;
+ this.parentJobId = parentJobId;
+ this.sequence = sequence > MAX_SEQUENCE ? MIN_SEQUENCE : sequence;
}
/**
- * Get consistency check version.
+ * Parse consistency check sequence.
*
- * @param consistencyCheckJobId consistency check job id.
+ * @param checkJobId consistency check job id
* @return sequence
*/
- public static int getSequence(final @NonNull String consistencyCheckJobId) {
- String versionString = consistencyCheckJobId.substring(consistencyCheckJobId.length() - 1);
+ public static int parseSequence(final @NonNull String checkJobId) {
+ String versionString = checkJobId.substring(checkJobId.length() - 1);
return Integer.parseInt(versionString);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index b3471ac39b4..fb5841d9fde 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -54,6 +54,6 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
@Override
public PipelineProcessContext getJobProcessContext() {
- throw new UnsupportedOperationException("");
+ throw new UnsupportedOperationException();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 58ba577aa8d..67fd3c59616 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -88,7 +88,9 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
private void prepare(final MigrationJobItemContext jobItemContext) {
try {
+ long startTimeMillis = System.currentTimeMillis();
jobPreparer.prepare(jobItemContext);
+ log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 9d3d1f91e2e..e88940af179 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -255,9 +255,25 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
@Override
public void rollback(final String jobId) throws SQLException {
log.info("Rollback job {}", jobId);
+ final long startTimeMillis = System.currentTimeMillis();
+ dropCheckJobs(jobId);
stop(jobId);
cleanTempTableOnRollback(jobId);
dropJob(jobId);
+ log.info("Rollback cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ }
+
+ private void dropCheckJobs(final String jobId) {
+ Collection<String> checkJobIds = PipelineAPIFactory.getGovernanceRepositoryAPI().listCheckJobIds(jobId);
+ if (checkJobIds.isEmpty()) {
+ return;
+ }
+ log.info("dropCheckJobs start...");
+ long startTimeMillis = System.currentTimeMillis();
+ for (String each : checkJobIds) {
+ dropJob(each);
+ }
+ log.info("dropCheckJobs cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
private void cleanTempTableOnRollback(final String jobId) throws SQLException {
@@ -281,8 +297,11 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
public void commit(final String jobId) {
checkModeConfig();
log.info("Commit job {}", jobId);
+ final long startTimeMillis = System.currentTimeMillis();
+ dropCheckJobs(jobId);
stop(jobId);
dropJob(jobId);
+ log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index bf698ad74e0..d7c397c7f4d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -74,7 +74,6 @@ public final class MigrationJobPreparer {
PipelineJobCenter.stop(jobItemContext.getJobId());
return;
}
- // TODO check metadata
if (PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType())) {
initIncrementalTasks(jobItemContext);
if (jobItemContext.isStopping()) {
@@ -84,6 +83,8 @@ public final class MigrationJobPreparer {
}
}
initInventoryTasks(jobItemContext);
+ log.info("prepare, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
+ jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItemContext) throws SQLException {
@@ -94,8 +95,9 @@ public final class MigrationJobPreparer {
if (null == JOB_API.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
JOB_API.persistJobItemProgress(jobItemContext);
}
+ long startTimeMillis = System.currentTimeMillis();
if (lockContext.tryLock(lockDefinition, 180000)) {
- log.info("try lock success, jobId={}, shardingItem={}", jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
try {
InventoryIncrementalJobItemProgress jobItemProgress = JOB_API.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
boolean prepareFlag = JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || JobStatus.RUNNING.equals(jobItemProgress.getStatus())
@@ -111,7 +113,7 @@ public final class MigrationJobPreparer {
}
}
} finally {
- log.info("unlock, jobId={}, shardingItem={}", jobConfig.getJobId(), jobItemContext.getShardingItem());
+ log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
lockContext.unlock(lockDefinition);
}
}
@@ -143,15 +145,12 @@ public final class MigrationJobPreparer {
}
private void initInventoryTasks(final MigrationJobItemContext jobItemContext) {
- log.info("initInventoryTasks, start...");
InventoryDumperConfiguration inventoryDumperConfig = new InventoryDumperConfiguration(jobItemContext.getTaskConfig().getDumperConfig());
PipelineColumnMetaData uniqueKeyColumn = jobItemContext.getJobConfig().getUniqueKeyColumn();
inventoryDumperConfig.setUniqueKey(uniqueKeyColumn.getName());
inventoryDumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperConfig, jobItemContext.getTaskConfig().getImporterConfig());
jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext));
- log.info("initInventoryTasks, jobId={}, shardingItem={}, inventoryTasks={}, incrementalTasks={}",
- jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
index 5545cc94dfe..0d0a25a722f 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ConsistencyCheckJobAPIImplTest.java
@@ -61,11 +61,12 @@ public final class ConsistencyCheckJobAPIImplTest {
CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(migrationJobId, null, null);
String checkJobId = checkJobAPI.createJobAndStart(parameter);
ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
- String expectCheckJobId = "j0201j0101test0";
+ int expectedSequence = ConsistencyCheckJobId.MIN_SEQUENCE;
+ String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
assertNull(jobConfig.getAlgorithmTypeName());
- int consistencyCheckVersion = ConsistencyCheckJobId.getSequence(expectCheckJobId);
- assertThat(consistencyCheckVersion, is(0));
+ int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
+ assertThat(sequence, is(expectedSequence));
}
@Test
@@ -75,11 +76,11 @@ public final class ConsistencyCheckJobAPIImplTest {
CreateConsistencyCheckJobParameter parameter = new CreateConsistencyCheckJobParameter(jobId.get(), null, null);
String checkJobId = checkJobAPI.createJobAndStart(parameter);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestJobId(jobId.get(), checkJobId);
- Map<String, DataConsistencyCheckResult> expectResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
+ Map<String, DataConsistencyCheckResult> expectedCheckResult = Collections.singletonMap("t_order", new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(1, 1),
new DataConsistencyContentCheckResult(true)));
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectResult);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(jobId.get(), checkJobId, expectedCheckResult);
Map<String, DataConsistencyCheckResult> actualCheckResult = checkJobAPI.getLatestDataConsistencyCheckResult(jobId.get());
- assertThat(actualCheckResult.size(), is(expectResult.size()));
- assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectResult.get("t_order").getContentCheckResult().isMatched()));
+ assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
+ assertThat(actualCheckResult.get("t_order").getCountCheckResult().isMatched(), is(expectedCheckResult.get("t_order").getContentCheckResult().isMatched()));
}
}