You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2022/10/10 05:53:23 UTC
[shardingsphere] branch master updated: Support cancel data consistency check and refactoring (#21429)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4c778a7eab7 Support cancel data consistency check and refactoring (#21429)
4c778a7eab7 is described below
commit 4c778a7eab7e1c867c1803568fc1ac7030774b1b
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Mon Oct 10 13:53:13 2022 +0800
Support cancel data consistency check and refactoring (#21429)
* Move DataConsistencyCalculateAlgorithmFactory
* Add SingleTableInventoryDataConsistencyChecker
* Refactor DataConsistencyCalculateParameter.tableNameSchemaNameMapping to schemaName
* Add DataConsistencyCalculatedResult, calculate count and content together
* Refactor SingleTableInventoryDataConsistencyChecker by DataConsistencyCalculatedResult
* Remove buildCountSQL from PipelineSQLBuilder
* Unit test
* Revert "Remove buildCountSQL from PipelineSQLBuilder"
This reverts commit ea946fcce577fe2aee72592176657464f4952753.
* Add CRC32_MATCH in MySQLMigrationGeneralIT
* Add log when crc32 not match
* Add cancel() for DataConsistencyCalculateAlgorithm and abstract impl
* Recover PipelineTask start stop methods
* Add ConsistencyCheckTasksRunner and refactor ConsistencyCheckJob
* Refactor AbstractPipelineJob to use common JobBootstrap
* Cancel DataConsistencyCalculateAlgorithm on stopping
* Log
---
.../DataConsistencyCalculateParameter.java | 4 +-
.../DataConsistencyCalculatedResult.java | 21 +--
.../DataConsistencyCalculateAlgorithm.java | 12 +-
.../DataConsistencyCalculateAlgorithmFactory.java | 3 +-
.../spi/sqlbuilder/PipelineSQLBuilder.java | 1 +
.../core/api/InventoryIncrementalJobAPI.java | 15 +-
.../AbstractInventoryIncrementalJobAPIImpl.java | 31 ++--
.../DataConsistencyCalculateAlgorithmChooser.java | 1 +
...SingleTableInventoryDataConsistencyChecker.java | 148 ++++++++++++++++++
.../AbstractDataConsistencyCalculateAlgorithm.java | 63 ++++++++
...StreamingDataConsistencyCalculateAlgorithm.java | 20 +--
...RC32MatchDataConsistencyCalculateAlgorithm.java | 76 ++++++++--
...DataMatchDataConsistencyCalculateAlgorithm.java | 27 ++--
.../PipelineJobHasAlreadyFinishedException.java | 2 +-
.../pipeline/core/job/AbstractPipelineJob.java | 4 +-
.../data/pipeline/core/task/IncrementalTask.java | 10 +-
.../data/pipeline/core/task/InventoryTask.java | 10 +-
.../data/pipeline/core/task/PipelineTask.java | 14 ++
...tencyCheckChangedJobConfigurationProcessor.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 61 +++-----
.../ConsistencyCheckTasksRunner.java | 139 +++++++++++++++++
.../MigrationChangedJobConfigurationProcessor.java | 2 +-
.../migration/MigrationDataConsistencyChecker.java | 167 +++------------------
.../pipeline/scenario/migration/MigrationJob.java | 4 +-
...MatchDataConsistencyCalculateAlgorithmTest.java | 17 +--
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 2 +-
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 2 +-
.../migration/general/MySQLMigrationGeneralIT.java | 9 +-
...taConsistencyCalculateAlgorithmFactoryTest.java | 1 +
.../DataConsistencyCalculateAlgorithmFixture.java | 9 +-
.../FixtureDataConsistencyCalculatedResult.java} | 19 +--
31 files changed, 598 insertions(+), 298 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 7463fadae10..154643bdd59 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -22,7 +22,6 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -43,8 +42,7 @@ public final class DataConsistencyCalculateParameter {
*/
private final PipelineDataSourceWrapper dataSource;
- // TODO replace to schemaName
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ private final String schemaName;
private final String logicTableName;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
similarity index 70%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
index 3d1c2827377..ea34f78d60e 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.api.check.consistency;
-import org.junit.Test;
-
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+/**
+ * Data consistency calculated result.
+ */
+public interface DataConsistencyCalculatedResult {
- @Test
- public void assertNewInstanceSuccess() {
- DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new Properties());
- }
+ /**
+ * Get records count.
+ *
+ * @return records count
+ */
+ int getRecordsCount();
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
index 368b501ee5a..6ee50d96e10 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithm.java
@@ -18,9 +18,12 @@
package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import org.apache.shardingsphere.infra.util.spi.aware.SPIMetadataAware;
+import java.sql.SQLException;
+
/**
* Data consistency calculate algorithm.
*/
@@ -32,5 +35,12 @@ public interface DataConsistencyCalculateAlgorithm extends ShardingSphereAlgorit
* @param parameter data consistency calculate parameter
* @return calculated result
*/
- Iterable<Object> calculate(DataConsistencyCalculateParameter parameter);
+ Iterable<DataConsistencyCalculatedResult> calculate(DataConsistencyCalculateParameter parameter);
+
+ /**
+ * Cancel calculation.
+ *
+ * @throws SQLException SQL exception if cancel underlying SQL execution failure
+ */
+ void cancel() throws SQLException;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
similarity index 92%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
index a37556b792f..8db297edc3c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/check/consistency/DataConsistencyCalculateAlgorithmFactory.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.spi.check.consistency;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmFactory;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 0a1299318a7..c5663ba699b 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -124,6 +124,7 @@ public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
* @param tableName table name
* @return count SQL
*/
+ // TODO keep it for now, it might be used later
String buildCountSQL(String schemaName, String tableName);
/**
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index fdf355a8a4f..077d08c7281 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -20,8 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import java.util.Map;
+import java.util.Properties;
/**
* Inventory incremental job API.
@@ -39,13 +41,24 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
@Override
InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+ /**
+ * Build data consistency calculate algorithm.
+ *
+ * @param jobConfig job configuration
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm properties
+ * @return calculate algorithm
+ */
+ DataConsistencyCalculateAlgorithm buildDataConsistencyCalculateAlgorithm(PipelineJobConfiguration jobConfig, String algorithmType, Properties algorithmProps);
+
/**
* Do data consistency check.
*
* @param pipelineJobConfig job configuration
+ * @param calculateAlgorithm calculate algorithm
* @return each logic table check result
*/
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig);
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheck(PipelineJobConfiguration pipelineJobConfig, DataConsistencyCalculateAlgorithm calculateAlgorithm);
/**
* Aggregate data consistency check results.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index acb875a682a..b98eb85cf8f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -23,20 +23,20 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmChooser;
-import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.data.pipeline.core.config.process.PipelineProcessConfigurationUtil;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
import org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.yaml.process.YamlPipelineProcessConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -150,19 +150,23 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
}).collect(Collectors.toList());
}
+ @Override
+ public DataConsistencyCalculateAlgorithm buildDataConsistencyCalculateAlgorithm(final PipelineJobConfiguration jobConfig, final String algorithmType, final Properties algorithmProps) {
+ ShardingSpherePreconditions.checkState(null != algorithmType || null != jobConfig, () -> new IllegalArgumentException("algorithmType and jobConfig are null"));
+ if (null != algorithmType) {
+ return DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps);
+ }
+ return DataConsistencyCalculateAlgorithmChooser.choose(
+ DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
+ }
+
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final String jobId) {
checkModeConfig();
log.info("Data consistency check for job {}", jobId);
PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig);
- }
-
- @Override
- public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig) {
- DataConsistencyCalculateAlgorithm algorithm = DataConsistencyCalculateAlgorithmChooser.choose(
- DatabaseTypeFactory.getInstance(jobConfig.getSourceDatabaseType()), DatabaseTypeFactory.getInstance(getTargetDatabaseType(jobConfig)));
- return dataConsistencyCheck(jobConfig, algorithm);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm = buildDataConsistencyCalculateAlgorithm(jobConfig, null, null);
+ return dataConsistencyCheck(jobConfig, calculateAlgorithm);
}
@Override
@@ -170,10 +174,11 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
checkModeConfig();
log.info("Data consistency check for job {}, algorithmType: {}", jobId, algorithmType);
PipelineJobConfiguration jobConfig = getJobConfiguration(getElasticJobConfigPOJO(jobId));
- return dataConsistencyCheck(jobConfig, DataConsistencyCalculateAlgorithmFactory.newInstance(algorithmType, algorithmProps));
+ return dataConsistencyCheck(jobConfig, buildDataConsistencyCalculateAlgorithm(jobConfig, algorithmType, algorithmProps));
}
- protected Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ @Override
+ public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final PipelineJobConfiguration jobConfig, final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
String jobId = jobConfig.getJobId();
Map<String, DataConsistencyCheckResult> result = buildPipelineDataConsistencyChecker(jobConfig, buildPipelineProcessContext(jobConfig)).check(calculateAlgorithm);
log.info("job {} with check algorithm '{}' data consistency checker result {}", jobId, calculateAlgorithm.getType(), result);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
index 5e50032e3eb..499dfa006c5 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmChooser.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
new file mode 100644
index 00000000000..3125fc778fc
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Single table inventory data consistency checker.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class SingleTableInventoryDataConsistencyChecker {
+
+ private final String jobId;
+
+ private final PipelineDataSourceWrapper sourceDataSource;
+
+ private final PipelineDataSourceWrapper targetDataSource;
+
+ private final SchemaTableName sourceTable;
+
+ private final SchemaTableName targetTable;
+
+ private final PipelineColumnMetaData uniqueKey;
+
+ private final PipelineTableMetaDataLoader metaDataLoader;
+
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+
+ /**
+ * Data consistency check.
+ *
+ * @param calculateAlgorithm calculate algorithm
+ * @return data consistency check result
+ */
+ public DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ try {
+ return check(calculateAlgorithm, executor);
+ } finally {
+ executor.shutdown();
+ executor.shutdownNow();
+ }
+ }
+
+ private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor) {
+ String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
+ String targetDatabaseType = targetDataSource.getDatabaseType().getType();
+ String sourceTableName = sourceTable.getTableName().getOriginal();
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(sourceTable.getSchemaName().getOriginal(), sourceTableName);
+ ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
+ Collection<String> columnNames = tableMetaData.getColumnNames();
+ DataConsistencyCalculateParameter sourceParameter = buildParameter(
+ sourceDataSource, sourceTable.getSchemaName().getOriginal(), sourceTableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+ DataConsistencyCalculateParameter targetParameter = buildParameter(
+ targetDataSource, targetTable.getSchemaName().getOriginal(), targetTable.getTableName().getOriginal(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
+ Iterator<DataConsistencyCalculatedResult> sourceCalculatedResults = calculateAlgorithm.calculate(sourceParameter).iterator();
+ Iterator<DataConsistencyCalculatedResult> targetCalculatedResults = calculateAlgorithm.calculate(targetParameter).iterator();
+ long sourceRecordsCount = 0;
+ long targetRecordsCount = 0;
+ boolean contentMatched = true;
+ while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
+ if (null != readRateLimitAlgorithm) {
+ readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+ }
+ Future<DataConsistencyCalculatedResult> sourceFuture = executor.submit(sourceCalculatedResults::next);
+ Future<DataConsistencyCalculatedResult> targetFuture = executor.submit(targetCalculatedResults::next);
+ DataConsistencyCalculatedResult sourceCalculatedResult = waitFuture(sourceFuture);
+ DataConsistencyCalculatedResult targetCalculatedResult = waitFuture(targetFuture);
+ sourceRecordsCount += sourceCalculatedResult.getRecordsCount();
+ targetRecordsCount += targetCalculatedResult.getRecordsCount();
+ contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
+ if (!contentMatched) {
+ log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
+ break;
+ }
+ }
+ return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
+ }
+
+ // TODO use digest (crc32, murmurhash)
+ private String getJobIdDigest(final String jobId) {
+ return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
+ }
+
+ private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource,
+ final String schemaName, final String tableName, final Collection<String> columnNames,
+ final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey) {
+ return new DataConsistencyCalculateParameter(sourceDataSource, schemaName, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
+ }
+
+ private <T> T waitFuture(final Future<T> future) {
+ try {
+ return future.get();
+ } catch (final InterruptedException | ExecutionException ex) {
+ if (ex.getCause() instanceof PipelineSQLException) {
+ throw (PipelineSQLException) ex.getCause();
+ }
+ throw new SQLWrapperException(new SQLException(ex));
+ }
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
new file mode 100644
index 00000000000..7e14ced4342
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractDataConsistencyCalculateAlgorithm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+
+/**
+ * Abstract data consistency calculate algorithm.
+ */
+@Slf4j
+public abstract class AbstractDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+
+ @Getter(AccessLevel.PROTECTED)
+ private volatile boolean canceling;
+
+ private volatile Statement currentStatement;
+
+ protected <T extends Statement> T setCurrentStatement(final T statement) {
+ this.currentStatement = statement;
+ return statement;
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ canceling = true;
+ Statement statement = currentStatement;
+ if (null == statement || statement.isClosed()) {
+ log.info("cancel, statement is null or closed");
+ return;
+ }
+ long startTimeMillis = System.currentTimeMillis();
+ try {
+ statement.cancel();
+ } catch (final SQLFeatureNotSupportedException ex) {
+ log.info("cancel is not supported: {}", ex.getMessage());
+ } catch (final SQLException ex) {
+ log.info("cancel failed: {}", ex.getMessage());
+ }
+ log.info("cancel cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
index e35afffba1c..c1c18347e88 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/AbstractStreamingDataConsistencyCalculateAlgorithm.java
@@ -21,7 +21,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import java.util.Iterator;
import java.util.Optional;
@@ -33,10 +33,10 @@ import java.util.concurrent.atomic.AtomicInteger;
@RequiredArgsConstructor
@Getter
@Slf4j
-public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
@Override
- public final Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
+ public final Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
return new ResultIterable(parameter);
}
@@ -46,30 +46,30 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm impleme
* @param parameter data consistency calculate parameter
* @return optional calculated result, empty means there's no more result
*/
- protected abstract Optional<Object> calculateChunk(DataConsistencyCalculateParameter parameter);
+ protected abstract Optional<DataConsistencyCalculatedResult> calculateChunk(DataConsistencyCalculateParameter parameter);
/**
* It's not thread-safe, it should be executed in only one thread at the same time.
*/
@RequiredArgsConstructor
- final class ResultIterable implements Iterable<Object> {
+ final class ResultIterable implements Iterable<DataConsistencyCalculatedResult> {
private final DataConsistencyCalculateParameter parameter;
@Override
- public Iterator<Object> iterator() {
+ public Iterator<DataConsistencyCalculatedResult> iterator() {
return new ResultIterator(parameter);
}
}
@RequiredArgsConstructor
- final class ResultIterator implements Iterator<Object> {
+ final class ResultIterator implements Iterator<DataConsistencyCalculatedResult> {
private final DataConsistencyCalculateParameter parameter;
private final AtomicInteger calculationCount = new AtomicInteger(0);
- private volatile Optional<Object> nextResult;
+ private volatile Optional<DataConsistencyCalculatedResult> nextResult;
@Override
public boolean hasNext() {
@@ -78,9 +78,9 @@ public abstract class AbstractStreamingDataConsistencyCalculateAlgorithm impleme
}
@Override
- public Object next() {
+ public DataConsistencyCalculatedResult next() {
calculateIfNecessary();
- Optional<Object> nextResult = this.nextResult;
+ Optional<DataConsistencyCalculatedResult> nextResult = this.nextResult;
parameter.setPreviousCalculatedResult(nextResult.orElse(null));
this.nextResult = null;
return nextResult.orElse(null);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index b2eeac94959..230298808c9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -18,11 +18,14 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -34,6 +37,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -41,11 +45,12 @@ import java.util.stream.Collectors;
/**
* CRC32 match data consistency calculate algorithm.
*/
-@Getter
-public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataConsistencyCalculateAlgorithm {
+@Slf4j
+public final class CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractDataConsistencyCalculateAlgorithm {
private static final Collection<String> SUPPORTED_DATABASE_TYPES = Collections.singletonList(new MySQLDatabaseType().getType());
+ @Getter
private Properties props;
@Override
@@ -54,26 +59,29 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
}
@Override
- public Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
+ public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
- return Collections.unmodifiableList(parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList()));
+ List<CalculatedItem> calculatedItems = parameter.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, parameter, each)).collect(Collectors.toList());
+ return Collections.singletonList(new CalculatedResult(calculatedItems.get(0).getRecordsCount(), calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private long calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
+ private CalculatedItem calculateCRC32(final PipelineSQLBuilder sqlBuilder, final DataConsistencyCalculateParameter parameter, final String columnName) {
String logicTableName = parameter.getLogicTableName();
- String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+ String schemaName = parameter.getSchemaName();
Optional<String> sql = sqlBuilder.buildCRC32SQL(schemaName, logicTableName, columnName);
ShardingSpherePreconditions.checkState(sql.isPresent(), () -> new UnsupportedCRC32DataConsistencyCalculateAlgorithmException(parameter.getDatabaseType()));
return calculateCRC32(parameter.getDataSource(), logicTableName, sql.get());
}
- private long calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql) {
+ private CalculatedItem calculateCRC32(final DataSource dataSource, final String logicTableName, final String sql) {
try (
Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql));
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
- return resultSet.getLong(1);
+ long crc32 = resultSet.getLong(1);
+ int recordsCount = resultSet.getInt(2);
+ return new CalculatedItem(crc32, recordsCount);
} catch (final SQLException ex) {
throw new PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
}
@@ -93,4 +101,52 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithm implements DataCo
public String getDescription() {
return "Match CRC32 of records.";
}
+
+ @RequiredArgsConstructor
+ @Getter
+ private static final class CalculatedItem {
+
+ private final long crc32;
+
+ private final int recordsCount;
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static final class CalculatedResult implements DataConsistencyCalculatedResult {
+
+ private final int recordsCount;
+
+ @NonNull
+ private final Collection<Long> columnsCrc32;
+
+ @Override
+ public boolean equals(final @NonNull Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (getClass() != o.getClass()) {
+ log.warn("CalculatedResult type not match, o.className={}", o.getClass().getName());
+ return false;
+ }
+ final CalculatedResult that = (CalculatedResult) o;
+ if (recordsCount != that.recordsCount) {
+ log.info("recordsCount not match, recordsCount={}, that.recordsCount={}", recordsCount, that.recordsCount);
+ return false;
+ }
+ if (!columnsCrc32.equals(that.columnsCrc32)) {
+ log.info("columnsCrc32 not match, columnsCrc32={}, that.columnsCrc32={}", columnsCrc32, that.columnsCrc32);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = recordsCount;
+ result = 31 * result + columnsCrc32.hashCode();
+ return result;
+ }
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index da0dbf28569..a2df6b70b79 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCheckUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.ColumnValueReaderFactory;
@@ -87,12 +88,12 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
@Override
- protected Optional<Object> calculateChunk(final DataConsistencyCalculateParameter parameter) {
+ protected Optional<DataConsistencyCalculatedResult> calculateChunk(final DataConsistencyCalculateParameter parameter) {
CalculatedResult previousCalculatedResult = (CalculatedResult) parameter.getPreviousCalculatedResult();
String sql = getQuerySQL(parameter);
try (
Connection connection = parameter.getDataSource().getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ PreparedStatement preparedStatement = setCurrentStatement(connection.prepareStatement(sql))) {
preparedStatement.setFetchSize(chunkSize);
if (null == previousCalculatedResult) {
preparedStatement.setInt(1, chunkSize);
@@ -105,6 +106,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ColumnValueReader columnValueReader = ColumnValueReaderFactory.getInstance(parameter.getDatabaseType());
while (resultSet.next()) {
+ if (isCanceling()) {
+ log.info("canceling, schemaName={}, tableName={}", parameter.getSchemaName(), parameter.getLogicTableName());
+ throw new PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
+ }
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int columnCount = resultSetMetaData.getColumnCount();
Collection<Object> record = new LinkedList<>();
@@ -124,11 +129,11 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
private String getQuerySQL(final DataConsistencyCalculateParameter parameter) {
PipelineSQLBuilder sqlBuilder = PipelineSQLBuilderFactory.getInstance(parameter.getDatabaseType());
String logicTableName = parameter.getLogicTableName();
- String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
+ String schemaName = parameter.getSchemaName();
String uniqueKey = parameter.getUniqueKey().getName();
- String cacheKey = parameter.getDatabaseType() + "-" + (DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
- ? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
- : logicTableName.toLowerCase());
+ String cacheKey = parameter.getDatabaseType() + "-" + (null != schemaName && DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
+ ? schemaName + "." + logicTableName
+ : logicTableName);
if (null == parameter.getPreviousCalculatedResult()) {
return firstSQLCache.computeIfAbsent(cacheKey, s -> sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
}
@@ -152,12 +157,12 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@RequiredArgsConstructor
@Getter
- private static final class CalculatedResult {
+ private static final class CalculatedResult implements DataConsistencyCalculatedResult {
@NonNull
private final Object maxUniqueKeyValue;
- private final int recordCount;
+ private final int recordsCount;
private final Collection<Collection<Object>> records;
@@ -172,10 +177,10 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
return false;
}
final CalculatedResult that = (CalculatedResult) o;
- boolean equalsFirst = new EqualsBuilder().append(getRecordCount(), that.getRecordCount()).append(getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue()).isEquals();
+ boolean equalsFirst = new EqualsBuilder().append(getRecordsCount(), that.getRecordsCount()).append(getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue()).isEquals();
if (!equalsFirst) {
log.warn("recordCount or maxUniqueKeyValue not match, recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={}",
- getRecordCount(), that.getRecordCount(), getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
+ getRecordsCount(), that.getRecordsCount(), getMaxUniqueKeyValue(), that.getMaxUniqueKeyValue());
return false;
}
Iterator<Collection<Object>> thisIterator = this.records.iterator();
@@ -218,7 +223,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
@Override
public int hashCode() {
- return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordCount()).append(getRecords()).toHashCode();
+ return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
}
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
index edbd4afb2f7..afdbcce0856 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyFinishedException.java
@@ -28,6 +28,6 @@ public final class PipelineJobHasAlreadyFinishedException extends PipelineSQLExc
private static final long serialVersionUID = 6881217592831423520L;
public PipelineJobHasAlreadyFinishedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 95, "Job has already finished, please run `CHECK MIGRATION %s` to start a new data consistency check job.", jobId);
+ super(XOpenSQLState.GENERAL_ERROR, 95, "Data consistency check job has already finished.", jobId);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 7d14d2707e0..d48f4467a0c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import java.util.Map;
import java.util.Optional;
@@ -42,7 +42,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
private volatile boolean stopping;
@Setter
- private volatile OneOffJobBootstrap oneOffJobBootstrap;
+ private volatile JobBootstrap jobBootstrap;
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 9e7363970a0..8b435383e69 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -104,11 +104,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
});
}
- /**
- * Start.
- *
- * @return future
- */
+ @Override
public CompletableFuture<?> start() {
taskProgress.getIncrementalTaskDelay().setLatestActiveTimeMillis(System.currentTimeMillis());
CompletableFuture<?> dumperFuture = incrementalExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -140,9 +136,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
return CompletableFuture.allOf(dumperFuture, importerFuture);
}
- /**
- * Stop.
- */
+ @Override
public void stop() {
dumper.stop();
for (Importer each : importers) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ca90aeb859e..a69ee2d121d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -83,11 +83,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
}
- /**
- * Start.
- *
- * @return future
- */
+ @Override
public CompletableFuture<?> start() {
CompletableFuture<?> dumperFuture = inventoryDumperExecuteEngine.submit(dumper, new ExecuteCallback() {
@@ -138,9 +134,7 @@ public final class InventoryTask implements PipelineTask, AutoCloseable {
return null;
}
- /**
- * Stop.
- */
+ @Override
public void stop() {
dumper.stop();
importer.stop();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
index ebc5a7e0e33..45dabe9ad68 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTask.java
@@ -19,11 +19,25 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import org.apache.shardingsphere.data.pipeline.api.task.progress.TaskProgress;
+import java.util.concurrent.CompletableFuture;
+
/**
* Pipeline task interface.
*/
public interface PipelineTask {
+ /**
+ * Start task.
+ *
+ * @return future
+ */
+ CompletableFuture<?> start();
+
+ /**
+ * Stop task.
+ */
+ void stop();
+
/**
* Get task id.
*
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 8d71eae5a7c..406fe64310c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -70,7 +70,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
ConsistencyCheckJob job = new ConsistencyCheckJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 77355f5281d..b86955e252d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -18,26 +18,16 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-
-import java.util.Collections;
-import java.util.Map;
/**
* Consistency check job.
@@ -45,50 +35,43 @@ import java.util.Map;
@Slf4j
public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
- private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
-
private final PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
@Override
public void execute(final ShardingContext shardingContext) {
String checkJobId = shardingContext.getJobName();
+ int shardingItem = shardingContext.getShardingItem();
+ log.info("Execute job {}-{}", checkJobId, shardingItem);
+ if (isStopping()) {
+ log.info("stopping true, ignore");
+ return;
+ }
setJobId(checkJobId);
- ConsistencyCheckJobConfiguration consistencyCheckJobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- JobStatus status = JobStatus.RUNNING;
- ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(consistencyCheckJobConfig, 0, status);
- jobAPI.persistJobItemProgress(jobItemContext);
- String parentJobId = consistencyCheckJobConfig.getParentJobId();
- log.info("execute consistency check, job id:{}, referred job id:{}", checkJobId, parentJobId);
- JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
- InventoryIncrementalJobPublicAPI jobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(jobType.getTypeName());
- Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = Collections.emptyMap();
- try {
- dataConsistencyCheckResult = StringUtils.isBlank(consistencyCheckJobConfig.getAlgorithmTypeName())
- ? jobPublicAPI.dataConsistencyCheck(parentJobId)
- : jobPublicAPI.dataConsistencyCheck(parentJobId, consistencyCheckJobConfig.getAlgorithmTypeName(), consistencyCheckJobConfig.getAlgorithmProps());
- status = JobStatus.FINISHED;
- } catch (final SQLWrapperException ex) {
- log.error("data consistency check failed", ex);
- status = JobStatus.CONSISTENCY_CHECK_FAILURE;
- jobAPI.persistJobItemErrorMessage(checkJobId, 0, ex);
+ ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
+ if (getTasksRunnerMap().containsKey(shardingItem)) {
+ log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
+ return;
}
- PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
- jobItemContext.setStatus(status);
- jobAPI.persistJobItemProgress(jobItemContext);
- jobAPI.stop(checkJobId);
- log.info("execute consistency check job finished, job id:{}, parent job id:{}", checkJobId, parentJobId);
+ ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
+ tasksRunner.start();
+ getTasksRunnerMap().put(shardingItem, tasksRunner);
}
@Override
public void stop() {
setStopping(true);
- if (null != getOneOffJobBootstrap()) {
- getOneOffJobBootstrap().shutdown();
+ if (null != getJobBootstrap()) {
+ getJobBootstrap().shutdown();
}
if (null == getJobId()) {
log.info("stop consistency check job, jobId is null, ignore");
return;
}
+ for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+ each.stop();
+ }
+ getTasksRunnerMap().clear();
String jobBarrierDisablePath = PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 0);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
new file mode 100644
index 00000000000..69055b5a85d
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckTasksRunner.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
+
+import java.sql.SQLException;
+import java.util.Map;
+
+/**
+ * Consistency check tasks runner.
+ */
+@Slf4j
+public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
+
+ private final ConsistencyCheckJobAPI checkJobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+
+ @Getter
+ private final ConsistencyCheckJobItemContext jobItemContext;
+
+ private final ConsistencyCheckJobConfiguration checkJobConfig;
+
+ private final String checkJobId;
+
+ private final String parentJobId;
+
+ private final LifecycleExecutor checkExecutor;
+
+ private final ExecuteCallback checkExecuteCallback;
+
+ public ConsistencyCheckTasksRunner(final ConsistencyCheckJobItemContext jobItemContext) {
+ this.jobItemContext = jobItemContext;
+ checkJobConfig = jobItemContext.getJobConfig();
+ checkJobId = checkJobConfig.getJobId();
+ parentJobId = checkJobConfig.getParentJobId();
+ checkExecutor = new CheckLifecycleExecutor();
+ checkExecuteCallback = new CheckExecuteCallback();
+ }
+
+ @Override
+ public void start() {
+ if (jobItemContext.isStopping()) {
+ log.info("job stopping, ignore consistency check");
+ return;
+ }
+ PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
+ ExecuteEngine executeEngine = ExecuteEngine.newFixedThreadInstance(1, checkJobId + "-check");
+ executeEngine.submit(checkExecutor, checkExecuteCallback);
+ }
+
+ @Override
+ public void stop() {
+ jobItemContext.setStopping(true);
+ log.info("stop, jobId={}, shardingItem={}", jobItemContext.getJobId(), jobItemContext.getShardingItem());
+ checkExecutor.stop();
+ }
+
+ private final class CheckLifecycleExecutor extends AbstractLifecycleExecutor {
+
+ private volatile DataConsistencyCalculateAlgorithm calculateAlgorithm;
+
+ @Override
+ protected void runBlocking() {
+ log.info("execute consistency check, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) PipelineAPIFactory.getPipelineJobAPI(jobType);
+ PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(parentJobId);
+ DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(
+ parentJobConfig, checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
+ this.calculateAlgorithm = calculateAlgorithm;
+ Map<String, DataConsistencyCheckResult> dataConsistencyCheckResult = jobAPI.dataConsistencyCheck(parentJobConfig, calculateAlgorithm);
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistCheckJobResult(parentJobId, checkJobId, dataConsistencyCheckResult);
+ }
+
+ @Override
+ protected void doStop() {
+ DataConsistencyCalculateAlgorithm algorithm = calculateAlgorithm;
+ log.info("doStop, algorithm={}", algorithm);
+ if (null != algorithm) {
+ try {
+ algorithm.cancel();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+
+ private final class CheckExecuteCallback implements ExecuteCallback {
+
+ @Override
+ public void onSuccess() {
+ log.info("onSuccess, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+ jobItemContext.setStatus(JobStatus.FINISHED);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ checkJobAPI.stop(checkJobId);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId);
+ checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+ jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
+ checkJobAPI.persistJobItemProgress(jobItemContext);
+ checkJobAPI.stop(checkJobId);
+ }
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 404bf1edc72..b0429f58cd2 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -72,7 +72,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
MigrationJob job = new MigrationJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index ea68ca1cd0b..50ab3a39afb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -18,54 +18,32 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTableInventoryDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
-import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.SQLWrapperException;
-import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Data consistency checker for migration job.
@@ -81,144 +59,35 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
this.jobConfig = jobConfig;
- this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
+ readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
}
@Override
- public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
- Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
- Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
- ? checkData(calculator)
- : Collections.emptyMap();
- Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>(countCheckResult.size());
- for (Entry<String, DataConsistencyCountCheckResult> entry : countCheckResult.entrySet()) {
- result.put(entry.getKey(), new DataConsistencyCheckResult(entry.getValue(), contentCheckResult.getOrDefault(entry.getKey(), new DataConsistencyContentCheckResult(false))));
- }
- return result;
- }
-
- private Map<String, DataConsistencyCountCheckResult> checkCount() {
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-count-check-%d");
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
+ public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculateAlgorithm) {
+ verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getSource());
+ verifyPipelineDatabaseType(calculateAlgorithm, jobConfig.getTarget());
+ SchemaTableName sourceTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getSourceTableName())), new TableName(jobConfig.getSourceTableName()));
+ SchemaTableName targetTable = new SchemaTableName(new SchemaName(tableNameSchemaNameMapping.getSchemaName(jobConfig.getTargetTableName())), new TableName(jobConfig.getTargetTableName()));
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
+ Map<String, DataConsistencyCheckResult> result = new LinkedHashMap<>();
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- result.put(jobConfig.getSourceTableName(), checkCount(sourceDataSource, targetDataSource, executor));
- return result;
+ PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
+ SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(
+ jobConfig.getJobId(), sourceDataSource, targetDataSource, sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm);
+ result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
- } finally {
- executor.shutdown();
- executor.shutdownNow();
- }
- }
-
- private DataConsistencyCountCheckResult checkCount(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
- Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, jobConfig.getSourceTableName(), sourceDataSource.getDatabaseType()));
- Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, jobConfig.getTargetTableName(), targetDataSource.getDatabaseType()));
- long sourceCount;
- long targetCount;
- try {
- sourceCount = sourceFuture.get();
- } catch (final InterruptedException | ExecutionException ex) {
- if (ex.getCause() instanceof PipelineSQLException) {
- throw (PipelineSQLException) ex.getCause();
- }
- throw new SQLWrapperException(new SQLException(ex));
- }
- try {
- targetCount = targetFuture.get();
- } catch (final InterruptedException | ExecutionException ex) {
- if (ex.getCause() instanceof PipelineSQLException) {
- throw (PipelineSQLException) ex.getCause();
- }
- throw new SQLWrapperException(new SQLException(ex));
- }
- return new DataConsistencyCountCheckResult(sourceCount, targetCount);
- }
-
- // TODO use digest (crc32, murmurhash)
- private String getJobIdDigest(final String jobId) {
- return jobId.length() <= 6 ? jobId : jobId.substring(0, 6);
- }
-
- private long count(final DataSource dataSource, final String tableName, final DatabaseType databaseType) {
- String sql = PipelineSQLBuilderFactory.getInstance(databaseType.getType()).buildCountSQL(tableNameSchemaNameMapping.getSchemaName(tableName), tableName);
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
- ResultSet resultSet = preparedStatement.executeQuery()) {
- resultSet.next();
- return resultSet.getLong(1);
- } catch (final SQLException ex) {
- throw new PipelineTableDataConsistencyCheckLoadingFailedException(tableName);
- }
- }
-
- private Map<String, DataConsistencyContentCheckResult> checkData(final DataConsistencyCalculateAlgorithm calculator) {
- checkPipelineDatabaseType(calculator, jobConfig.getSource());
- PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSource();
- checkPipelineDatabaseType(calculator, jobConfig.getTarget());
- PipelineDataSourceConfiguration targetDataSourceConfig = jobConfig.getTarget();
- ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobConfig.getJobId()) + "-data-check-%d");
- ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
- Map<String, DataConsistencyContentCheckResult> result = new HashMap<>(1, 1);
- try (
- PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
- PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
- String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
- StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
- for (String each : Collections.singleton(jobConfig.getSourceTableName())) {
- PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
- ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(each));
- Collection<String> columnNames = tableMetaData.getColumnNames();
- PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
- DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
- DataConsistencyCalculateParameter targetParameter = buildParameter(
- targetDataSource, tableNameSchemaNameMapping, jobConfig.getTargetTableName(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
- Iterator<Object> sourceCalculatedResults = calculator.calculate(sourceParameter).iterator();
- Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
- boolean contentMatched = true;
- while (sourceCalculatedResults.hasNext() && targetCalculatedResults.hasNext()) {
- if (null != readRateLimitAlgorithm) {
- readRateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
- }
- Future<Object> sourceFuture = executor.submit(sourceCalculatedResults::next);
- Future<Object> targetFuture = executor.submit(targetCalculatedResults::next);
- Object sourceCalculatedResult = sourceFuture.get();
- Object targetCalculatedResult = targetFuture.get();
- contentMatched = Objects.equals(sourceCalculatedResult, targetCalculatedResult);
- if (!contentMatched) {
- break;
- }
- }
- result.put(each, new DataConsistencyContentCheckResult(contentMatched));
- }
- } catch (final SQLException ex) {
- throw new SQLWrapperException(ex);
- } catch (final ExecutionException | InterruptedException ex) {
- throw new SQLWrapperException(new SQLException(ex.getCause()));
- } finally {
- executor.shutdown();
- executor.shutdownNow();
}
return result;
}
- private void checkPipelineDatabaseType(final DataConsistencyCalculateAlgorithm calculator, final PipelineDataSourceConfiguration dataSourceConfig) {
- ShardingSpherePreconditions.checkState(calculator.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
+ private void verifyPipelineDatabaseType(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final PipelineDataSourceConfiguration dataSourceConfig) {
+ ShardingSpherePreconditions.checkState(calculateAlgorithm.getSupportedDatabaseTypes().contains(dataSourceConfig.getDatabaseType().getType()),
() -> new UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
}
-
- private DataConsistencyCalculateParameter buildParameter(final PipelineDataSourceWrapper sourceDataSource, final TableNameSchemaNameMapping tableNameSchemaNameMapping,
- final String tableName, final Collection<String> columnNames,
- final String sourceDatabaseType, final String targetDatabaseType, final PipelineColumnMetaData uniqueKey) {
- return new DataConsistencyCalculateParameter(sourceDataSource, tableNameSchemaNameMapping, tableName, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
- }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 018570e184c..244dc46efcf 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -112,8 +112,8 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
public void stop() {
setStopping(true);
dataSourceManager.close();
- if (null != getOneOffJobBootstrap()) {
- getOneOffJobBootstrap().shutdown();
+ if (null != getJobBootstrap()) {
+ getJobBootstrap().shutdown();
}
String jobId = getJobId();
if (null == jobId) {
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index b0d0f42a10d..e5ae0a9635e 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
@@ -35,7 +35,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import static org.hamcrest.CoreMatchers.is;
@@ -60,28 +59,28 @@ public final class CRC32MatchDataConsistencyCalculateAlgorithmTest {
@Before
public void setUp() throws SQLException {
PipelineColumnMetaData uniqueKey = new PipelineColumnMetaData(1, "id", Types.INTEGER, "integer", false, true, true);
- parameter = new DataConsistencyCalculateParameter(pipelineDataSource, new TableNameSchemaNameMapping(Collections.emptyMap()),
+ parameter = new DataConsistencyCalculateParameter(pipelineDataSource, null,
"foo_tbl", Arrays.asList("foo_col", "bar_col"), "FIXTURE", "FIXTURE", uniqueKey);
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
@Test
public void assertCalculateSuccess() throws SQLException {
- PreparedStatement preparedStatement0 = mockPreparedStatement(0L);
+ PreparedStatement preparedStatement0 = mockPreparedStatement(123L, 10);
when(connection.prepareStatement("SELECT CRC32(foo_col) FROM foo_tbl")).thenReturn(preparedStatement0);
- PreparedStatement preparedStatement1 = mockPreparedStatement(1L);
+ PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
when(connection.prepareStatement("SELECT CRC32(bar_col) FROM foo_tbl")).thenReturn(preparedStatement1);
- Iterator<Object> actual = new CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
- assertThat(actual.next(), is(0L));
- assertThat(actual.next(), is(1L));
+ Iterator<DataConsistencyCalculatedResult> actual = new CRC32MatchDataConsistencyCalculateAlgorithm().calculate(parameter).iterator();
+ assertThat(actual.next().getRecordsCount(), is(10));
assertFalse(actual.hasNext());
}
- private PreparedStatement mockPreparedStatement(final long expectedCRC32Result) throws SQLException {
+ private PreparedStatement mockPreparedStatement(final long expectedCRC32Result, final int expectedRecordsCount) throws SQLException {
ResultSet resultSet = mock(ResultSet.class);
PreparedStatement result = mock(PreparedStatement.class, RETURNS_DEEP_STUBS);
when(result.executeQuery()).thenReturn(resultSet);
when(resultSet.getLong(1)).thenReturn(expectedCRC32Result);
+ when(resultSet.getInt(2)).thenReturn(expectedRecordsCount);
return result;
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 14440f0344d..286de508afe 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -51,7 +51,7 @@ public final class MySQLPipelineSQLBuilder extends AbstractPipelineSQLBuilder {
@Override
public Optional<String> buildCRC32SQL(final String schemaName, final String tableName, final String column) {
- return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS UNSIGNED)) AS checksum FROM %s", quote(column), quote(tableName)));
+ return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column), quote(tableName)));
}
@Override
diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 3a2049c3487..f2b276fefde 100644
--- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -55,7 +55,7 @@ public final class MySQLPipelineSQLBuilderTest {
public void assertBuildSumCrc32SQL() {
Optional<String> actual = sqlBuilder.buildCRC32SQL(null, "t2", "id");
assertTrue(actual.isPresent());
- assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS UNSIGNED)) AS checksum FROM t2"));
+ assertThat(actual.get(), is("SELECT BIT_XOR(CAST(CRC32(id) AS UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM t2"));
}
private DataRecord mockDataRecord(final String tableName) {
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 0c4047c2286..2929e143128 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -99,8 +99,9 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), keyGenerateAlgorithm, 30));
String orderJobId = getJobIdByTableName(getSourceTableOrderName());
String orderItemJobId = getJobIdByTableName("t_order_item");
- assertMigrationSuccessById(orderJobId);
- assertMigrationSuccessById(orderItemJobId);
+ assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+ assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
+ assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
for (String each : listJobId()) {
commitMigrationByJobId(each);
@@ -111,12 +112,12 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
}
- private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
+ private void assertMigrationSuccessById(final String jobId, final String algorithmType) throws SQLException, InterruptedException {
List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
}
- assertCheckMigrationSuccess(jobId, "DATA_MATCH");
+ assertCheckMigrationSuccess(jobId, algorithmType);
stopMigrationByJobId(jobId);
}
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
index 3d1c2827377..01fd3394a42 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import org.junit.Test;
import java.util.Properties;
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
index f6ec7327507..051ce468505 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/DataConsistencyCalculateAlgorithmFixture.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
import lombok.Getter;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
@@ -39,8 +40,12 @@ public final class DataConsistencyCalculateAlgorithmFixture implements DataConsi
}
@Override
- public Iterable<Object> calculate(final DataConsistencyCalculateParameter parameter) {
- return Collections.singletonList(true);
+ public Iterable<DataConsistencyCalculatedResult> calculate(final DataConsistencyCalculateParameter parameter) {
+ return Collections.singletonList(new FixtureDataConsistencyCalculatedResult(2));
+ }
+
+ @Override
+ public void cancel() {
}
@Override
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
similarity index 64%
copy from test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
copy to test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
index 3d1c2827377..b61af786c4a 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCalculateAlgorithmFactoryTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureDataConsistencyCalculatedResult.java
@@ -15,16 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.check.consistency;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.junit.Test;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
-import java.util.Properties;
-
-public final class DataConsistencyCalculateAlgorithmFactoryTest {
+@RequiredArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class FixtureDataConsistencyCalculatedResult implements DataConsistencyCalculatedResult {
- @Test
- public void assertNewInstanceSuccess() {
- DataConsistencyCalculateAlgorithmFactory.newInstance("FIXTURE", new Properties());
- }
+ private final int recordsCount;
}