You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/23 08:17:55 UTC
[shardingsphere] branch master updated: Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0574a476e84 Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)
0574a476e84 is described below
commit 0574a476e84a8c7dbea13355f3b941f1d657a165
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 23 16:17:47 2022 +0800
Extract InventoryIncrementalJobAPI and extend InventoryIncrementalJobPublicAPI for common usage (#21152)
* Extract more methods to InventoryIncrementalJobPublicAPI; Add InventoryIncrementalJobAPI
* Add PipelineDataConsistencyChecker
* Rename AbstractInventoryIncrementalJobAPIImpl
---
.../api/InventoryIncrementalJobPublicAPI.java | 43 ++++-
.../data/pipeline/api/MigrationJobPublicAPI.java | 38 ----
.../PipelineDataConsistencyChecker.java} | 25 +--
.../api/InventoryIncrementalJobAPI.java} | 30 +--
.../AbstractInventoryIncrementalJobAPIImpl.java | 203 +++++++++++++++++++++
.../impl/InventoryIncrementalJobPublicAPIImpl.java | 74 --------
.../MigrationChangedJobConfigurationProcessor.java | 7 +-
.../migration/MigrationDataConsistencyChecker.java | 16 +-
.../scenario/migration/MigrationJobAPI.java | 36 +---
.../scenario/migration/MigrationJobAPIImpl.java | 122 ++-----------
.../migration/MigrationProcessContext.java | 6 +
.../MigrationDataConsistencyCheckerTest.java | 4 +-
12 files changed, 299 insertions(+), 305 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index 53b89e72f35..5750e196aca 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -17,14 +17,22 @@
package org.apache.shardingsphere.data.pipeline.api;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
/**
- * Inventory incremental job API.
+ * Inventory incremental job public API.
*/
+@SingletonSPI
public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI, TypedSPI {
/**
@@ -70,4 +78,37 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
* @throws SQLException when commit underlying database data
*/
void commit(String jobId) throws SQLException;
+
+ /**
+ * Get job progress.
+ *
+ * @param jobId job id
+ * @return each sharding item progress
+ */
+ Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String jobId);
+
+ /**
+ * List all data consistency check algorithms from SPI.
+ *
+ * @return data consistency check algorithms
+ */
+ Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
+
+ /**
+ * Do data consistency check.
+ *
+ * @param jobId job id
+ * @return each logic table check result
+ */
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
+
+ /**
+ * Do data consistency check.
+ *
+ * @param jobId job id
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm props. Nullable
+ * @return each logic table check result
+ */
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index e60b48bfe92..fb8fe96405e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.data.pipeline.api;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
@@ -29,7 +26,6 @@ import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
/**
* Migration job public API.
@@ -45,40 +41,6 @@ public interface MigrationJobPublicAPI extends InventoryIncrementalJobPublicAPI,
@Override
List<MigrationJobInfo> list();
- /**
- * Get job progress.
- *
- * @param jobId job id
- * @return each sharding item progress
- */
- // TODO add JobProgress
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String jobId);
-
- /**
- * List all data consistency check algorithms from SPI.
- *
- * @return data consistency check algorithms
- */
- Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms();
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm props. Nullable
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, String algorithmType, Properties algorithmProps);
-
/**
* Add migration source resources.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
similarity index 56%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
index b2e11b30c15..92420d82c63 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/PipelineDataConsistencyChecker.java
@@ -15,21 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.util.Map;
/**
- * Migration process context.
+ * Pipeline data consistency checker.
*/
-@Getter
-@Slf4j
-public final class MigrationProcessContext extends AbstractInventoryIncrementalProcessContext {
+public interface PipelineDataConsistencyChecker {
- public MigrationProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
- super(jobId, originalProcessConfig);
- }
+ /**
+ * Data consistency check.
+ *
+ * @param calculateAlgorithm calculate algorithm
+ * @return check results. key is logic table name, value is check result.
+ */
+ Map<String, DataConsistencyCheckResult> check(DataConsistencyCalculateAlgorithm calculateAlgorithm);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
similarity index 60%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index 982a99f2e9e..fdf355a8a4f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -15,42 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration;
+package org.apache.shardingsphere.data.pipeline.core.api;
-import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Map;
/**
- * Migration job API.
+ * Inventory incremental job API.
*/
-@SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
-
- @Override
- MigrationJobConfiguration getJobConfiguration(String jobId);
-
- @Override
- MigrationTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig);
-
- @Override
- MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
/**
* Get job progress.
*
- * @param jobConfig job configuration
+ * @param pipelineJobConfig job configuration
* @return each sharding item progress
*/
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(MigrationJobConfiguration jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(PipelineJobConfiguration pipelineJobConfig);
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
@@ -58,10 +42,10 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
/**
* Do data consistency check.
*
- * @param jobConfig job configuration
+ * @param pipelineJobConfig job configuration
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(MigrationJobConfiguration jobConfig);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
/**
* Aggregate data consistency check results.
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
new file mode 100644
index 00000000000..d0be204a7d0
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Abstract inventory incremental job API implementation.
+ */
+@Slf4j
+public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI, InventoryIncrementalJobPublicAPI {
+
+ private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
+
+ private final InventoryIncrementalJobItemAPIImpl jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
+
+ protected abstract String getTargetDatabaseType(PipelineJobConfiguration pipelineJobConfig);
+
+ @Override
+ public abstract InventoryIncrementalProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+ PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
+ processConfigPersistService.persist(getJobType(), processConfig);
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
+ // TODO check rateLimiter type match or not
+ YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+ targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+ processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
+ PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
+ ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
+ return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+ }
+
+ @Override
+ public void dropProcessConfiguration(final String confPath) {
+ String finalConfPath = confPath.trim();
+ PipelineProcessConfigurationUtil.verifyConfPath(confPath);
+ YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
+ PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
+ processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
+ result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
+ return result;
+ }
+
+ @Override
+ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final String jobId) {
+ checkModeConfig();
+ return getJobProgress(getJobConfiguration(jobId));
+ }
+
+ @Override
+ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
+ JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
+ InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
+ if (null != jobItemProgress) {
+ jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
+ jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
+ }
+ map.put(each, jobItemProgress);
+ }, LinkedHashMap::putAll);
+ }
+
+ @Override
+ public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+ return jobItemAPI.getJobItemProgress(jobId, shardingItem);
+ }
+
+ @Override
+ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+ jobItemAPI.persistJobItemProgress(jobItemContext);
+ }
+
+ @Override
+ public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+ jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+ }
+
+ @Override
+ public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
+ checkModeConfig();
+ return DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each -> {
+ DataConsistencyCheckAlgorithmInfo result = new DataConsistencyCheckAlgorithmInfo();
+ result.setType(each.getType());
+ result.setDescription(each.getDescription());
+ result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
+ return result;
+ }).collect(Collectors.toList());
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
+ checkModeConfig();
+ log.info("Data consistency check for job {}", jobId);
+ PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return dataConsistencyCheck(jobConfig);
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig) {
+ DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
+ DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+ return dataConsistencyCheck(jobConfig, algorithm);
+ }
+
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
+ checkModeConfig();
+ log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
+ PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
+ return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
+ }
+
+ private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ String jobId = jobConfig.getJobId();
+ Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
+ log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
+ return result;
+ }
+
+ protected abstract PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext);
+
+ @Override
+ public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
+ if (checkResults.isEmpty()) {
+ return false;
+ }
+ for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
+ DataConsistencyCheckResult checkResult = entry.getValue();
+ boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
+ boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
+ if (!isCountMatched || !isContentMatched) {
+ log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
deleted file mode 100644
index 44f48883545..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobPublicAPIImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.api.impl;
-
-import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.process.yaml.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
-import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-
-/**
- * Inventory incremental job API implementation.
- */
-public abstract class InventoryIncrementalJobPublicAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobPublicAPI {
-
- private static final YamlPipelineProcessConfigurationSwapper PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
-
- private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
- @Override
- public void createProcessConfiguration(final PipelineProcessConfiguration processConfig) {
- PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkState(null == existingProcessConfig, CreateExistsProcessConfigurationException::new);
- processConfigPersistService.persist(getJobType(), processConfig);
- }
-
- @Override
- public void alterProcessConfiguration(final PipelineProcessConfiguration processConfig) {
- // TODO check rateLimiter type match or not
- YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
- targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
- processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- private YamlPipelineProcessConfiguration getTargetYamlProcessConfiguration() {
- PipelineProcessConfiguration existingProcessConfig = processConfigPersistService.load(getJobType());
- ShardingSpherePreconditions.checkNotNull(existingProcessConfig, AlterNotExistProcessConfigurationException::new);
- return PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
- }
-
- @Override
- public void dropProcessConfiguration(final String confPath) {
- String finalConfPath = confPath.trim();
- PipelineProcessConfigurationUtil.verifyConfPath(confPath);
- YamlPipelineProcessConfiguration targetYamlProcessConfig = getTargetYamlProcessConfiguration();
- PipelineProcessConfigurationUtil.setFieldsNullByConfPath(targetYamlProcessConfig, finalConfPath);
- processConfigPersistService.persist(getJobType(), PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
- }
-
- @Override
- public PipelineProcessConfiguration showProcessConfiguration() {
- PipelineProcessConfiguration result = processConfigPersistService.load(getJobType());
- result = PipelineProcessConfigurationUtil.convertWithDefaultValue(result);
- return result;
- }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 1333121ec89..8f5b30e7eef 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -52,8 +52,11 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
log.info("{} executing jobs", jobId);
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor())
- .whenComplete((unused, throwable) -> log.error("execute failed, jobId={}", jobId, throwable));
+ CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ if (null != throwable) {
+ log.error("execute failed, jobId={}", jobId, throwable);
+ }
+ });
}
break;
case DELETED:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a196b56e552..a0fd5c4ae5a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -30,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -69,7 +71,7 @@ import java.util.concurrent.TimeUnit;
* Data consistency checker for migration job.
*/
@Slf4j
-public final class MigrationDataConsistencyChecker {
+public final class MigrationDataConsistencyChecker implements PipelineDataConsistencyChecker {
private final MigrationJobConfiguration jobConfig;
@@ -81,21 +83,16 @@ public final class MigrationDataConsistencyChecker {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
this.jobConfig = jobConfig;
sourceTableName = jobConfig.getSourceTableName();
targetTableName = jobConfig.getTargetTableName();
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
- this.readRateLimitAlgorithm = readRateLimitAlgorithm;
+ this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
}
- /**
- * Check data consistency.
- *
- * @param calculator data consistency calculate algorithm
- * @return checked result. key is logic table name, value is check result.
- */
+ @Override
public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
@@ -113,6 +110,7 @@ public final class MigrationDataConsistencyChecker {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
+ // TODO migration might support multiple tables
Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index 982a99f2e9e..efa627cb5e4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -18,22 +18,18 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-import java.util.Map;
-
/**
* Migration job API.
*/
@SingletonSPI
-public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI, RequiredSPI {
+public interface MigrationJobAPI extends InventoryIncrementalJobAPI, MigrationJobPublicAPI, RequiredSPI {
@Override
MigrationJobConfiguration getJobConfiguration(String jobId);
@@ -43,32 +39,4 @@ public interface MigrationJobAPI extends PipelineJobAPI, MigrationJobPublicAPI,
@Override
MigrationProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-
- /**
- * Get job progress.
- *
- * @param jobConfig job configuration
- * @return each sharding item progress
- */
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(MigrationJobConfiguration jobConfig);
-
- @Override
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-
- /**
- * Do data consistency check.
- *
- * @param jobConfig job configuration
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(MigrationJobConfiguration jobConfig);
-
- /**
- * Aggregate data consistency check results.
- *
- * @param jobId job id
- * @param checkResults check results
- * @return check success or not
- */
- boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, DataConsistencyCheckResult> checkResults);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index b4419dafa05..027c13f1a40 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.CreateTableConfiguration.CreateTableEntry;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
@@ -34,7 +34,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigration
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -43,10 +42,8 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
@@ -55,15 +52,10 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.yaml.YamlPipelineColumnMetaDataSwapper;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
-import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobPublicAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractInventoryIncrementalJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.connection.AddMigrationSourceResourceException;
@@ -72,8 +64,6 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSche
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -107,23 +97,19 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Migration job API impl.
*/
@Slf4j
-public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPIImpl implements MigrationJobAPI {
+public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl implements MigrationJobAPI {
private static final YamlRuleConfigurationSwapperEngine RULE_CONFIG_SWAPPER_ENGINE = new YamlRuleConfigurationSwapperEngine();
private static final YamlDataSourceConfigurationSwapper DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
- private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
-
private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
@Override
@@ -203,6 +189,11 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
return YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
+ @Override
+ protected String getTargetDatabaseType(final PipelineJobConfiguration pipelineJobConfig) {
+ return ((MigrationJobConfiguration) pipelineJobConfig).getTargetDatabaseType();
+ }
+
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig;
@@ -265,99 +256,8 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
}
@Override
- public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final String jobId) {
- checkModeConfig();
- return getJobProgress(getJobConfiguration(jobId));
- }
-
- @Override
- public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final MigrationJobConfiguration jobConfig) {
- String jobId = jobConfig.getJobId();
- JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
- InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
- if (null != jobItemProgress) {
- jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
- jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
- }
- map.put(each, jobItemProgress);
- }, LinkedHashMap::putAll);
- }
-
- @Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
- return (InventoryIncrementalJobItemProgress) jobItemAPI.getJobItemProgress(jobId, shardingItem);
- }
-
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
- jobItemAPI.persistJobItemProgress(jobItemContext);
- }
-
- @Override
- public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
- jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
- }
-
- @Override
- public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
- checkModeConfig();
- return DataConsistencyCalculateAlgorithmFactory.getAllInstances().stream().map(each -> {
- DataConsistencyCheckAlgorithmInfo result = new DataConsistencyCheckAlgorithmInfo();
- result.setType(each.getType());
- result.setDescription(each.getDescription());
- result.setSupportedDatabaseTypes(each.getSupportedDatabaseTypes());
- return result;
- }).collect(Collectors.toList());
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
- checkModeConfig();
- log.info("Data consistency check for job {}", jobId);
- MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig) {
- DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
- DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(jobConfig.getTargetDatabaseType()));
- return dataConsistencyCheck(jobConfig, algorithm);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId, final String algorithmType, final Properties algorithmProps) {
- checkModeConfig();
- log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
- MigrationJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
- }
-
- private Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final MigrationJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculator) {
- String jobId = jobConfig.getJobId();
- JobRateLimitAlgorithm readRateLimitAlgorithm = buildPipelineProcessContext(jobConfig).getReadRateLimitAlgorithm();
- Map<String, DataConsistencyCheckResult> result = new MigrationDataConsistencyChecker(jobConfig, readRateLimitAlgorithm).check(calculator);
- log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculator.getType(), result);
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckLatestResult(jobId, aggregateDataConsistencyCheckResults(jobId, result));
- return result;
- }
-
- @Override
- public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, DataConsistencyCheckResult> checkResults) {
- if (checkResults.isEmpty()) {
- return false;
- }
- for (Entry<String, DataConsistencyCheckResult> entry : checkResults.entrySet()) {
- DataConsistencyCheckResult checkResult = entry.getValue();
- boolean isCountMatched = checkResult.getCountCheckResult().isMatched();
- boolean isContentMatched = checkResult.getContentCheckResult().isMatched();
- if (!isCountMatched || !isContentMatched) {
- log.error("job: {}, table: {} data consistency check failed, count matched: {}, content matched: {}", jobId, entry.getKey(), isCountMatched, isContentMatched);
- return false;
- }
- }
- return true;
+ protected PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final InventoryIncrementalProcessContext processContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext);
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index b2e11b30c15..1122a40ecc0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -29,6 +29,12 @@ import org.apache.shardingsphere.data.pipeline.core.context.AbstractInventoryInc
@Slf4j
public final class MigrationProcessContext extends AbstractInventoryIncrementalProcessContext {
+ /**
+ * Constructor.
+ *
+ * @param jobId job id
+ * @param originalProcessConfig original process configuration, nullable
+ */
public MigrationProcessContext(final String jobId, final PipelineProcessConfiguration originalProcessConfig) {
super(jobId, originalProcessConfig);
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
index bf492b724a2..fa646106a05 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyCheckerTest.java
@@ -45,7 +45,9 @@ public final class MigrationDataConsistencyCheckerTest {
@Test
public void assertCountAndDataCheck() throws SQLException {
- Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(createJobConfiguration(), null).check(new DataConsistencyCalculateAlgorithmFixture());
+ MigrationJobConfiguration jobConfig = createJobConfiguration();
+ Map<String, DataConsistencyCheckResult> actual = new MigrationDataConsistencyChecker(jobConfig, new MigrationProcessContext(jobConfig.getJobId(), null))
+ .check(new DataConsistencyCalculateAlgorithmFixture());
assertTrue(actual.get("t_order").getCountCheckResult().isMatched());
assertThat(actual.get("t_order").getCountCheckResult().getSourceRecordsCount(), is(actual.get("t_order").getCountCheckResult().getTargetRecordsCount()));
assertTrue(actual.get("t_order").getContentCheckResult().isMatched());