You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/09/23 12:25:49 UTC
[shardingsphere] branch master updated: For code format (#21147)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1409d7ed70b For code format (#21147)
1409d7ed70b is described below
commit 1409d7ed70b4f43a39a773884fc93aafc5036960
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Sep 23 20:25:40 2022 +0800
For code format (#21147)
---
.../MigrationChangedJobConfigurationProcessor.java | 8 ++---
.../migration/MigrationDataConsistencyChecker.java | 40 ++++++++++------------
.../pipeline/scenario/migration/MigrationJob.java | 2 +-
.../scenario/migration/MigrationJobAPIImpl.java | 23 ++++++-------
.../migration/MigrationTaskConfiguration.java | 2 +-
5 files changed, 34 insertions(+), 41 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 8f5b30e7eef..490bcb2afe7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -27,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import java.util.concurrent.CompletableFuture;
@@ -38,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
@Override
- public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+ public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
log.info("{} is disabled", jobId);
@@ -61,8 +60,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
break;
case DELETED:
log.info("deleted jobId={}", jobId);
- MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
- new MigrationJobPreparer().cleanup(jobConfig);
+ new MigrationJobPreparer().cleanup(YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()));
PipelineJobCenter.stop(jobId);
break;
default:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a0fd5c4ae5a..6301ea80ba6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -31,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -71,28 +69,27 @@ import java.util.concurrent.TimeUnit;
* Data consistency checker for migration job.
*/
@Slf4j
-public final class MigrationDataConsistencyChecker implements PipelineDataConsistencyChecker {
+public final class MigrationDataConsistencyChecker {
private final MigrationJobConfiguration jobConfig;
- private final String sourceTableName;
-
- private final String targetTableName;
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
- private final JobRateLimitAlgorithm readRateLimitAlgorithm;
-
- public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
+ public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
- sourceTableName = jobConfig.getSourceTableName();
- targetTableName = jobConfig.getTargetTableName();
- tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
- new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
- this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
+ this.readRateLimitAlgorithm = readRateLimitAlgorithm;
+ tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
+ TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
}
- @Override
+ /**
+ * Check data consistency.
+ *
+ * @param calculator data consistency calculate algorithm
+ * @return checked result, key is logic table name, value is check result.
+ */
public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
@@ -110,12 +107,11 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
- // TODO migration might support multiple tables
Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
try (
PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- result.put(sourceTableName, checkCount(sourceDataSource, targetDataSource, executor));
+ result.put(jobConfig.getSourceTableName(), checkCount(sourceDataSource, targetDataSource, executor));
return result;
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
@@ -126,8 +122,8 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
}
private DataConsistencyCountCheckResult checkCount(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
- Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, sourceTableName, sourceDataSource.getDatabaseType()));
- Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, targetTableName, targetDataSource.getDatabaseType()));
+ Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, jobConfig.getSourceTableName(), sourceDataSource.getDatabaseType()));
+ Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, jobConfig.getTargetTableName(), targetDataSource.getDatabaseType()));
long sourceCount;
long targetCount;
try {
@@ -181,14 +177,14 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
- for (String each : Collections.singletonList(sourceTableName)) {
+ for (String each : Collections.singleton(jobConfig.getSourceTableName())) {
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(each));
Collection<String> columnNames = tableMetaData.getColumnNames();
PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
- DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, columnNames, targetDatabaseType, sourceDatabaseType,
- uniqueKey);
+ DataConsistencyCalculateParameter targetParameter = buildParameter(
+ targetDataSource, tableNameSchemaNameMapping, jobConfig.getTargetTableName(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
Iterator<Object> sourceCalculatedResults = calculator.calculate(sourceParameter).iterator();
Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
boolean contentMatched = true;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 817512e1d17..fcfa8dbf962 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -43,8 +43,8 @@ import java.sql.SQLException;
/**
* Migration job.
*/
-@Slf4j
@RequiredArgsConstructor
+@Slf4j
public final class MigrationJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
private final MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 027c13f1a40..034c7555060 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -112,11 +112,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
- @Override
- public JobType getJobType() {
- return JobType.MIGRATION;
- }
-
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
MigrationJobId jobId = (MigrationJobId) pipelineJobId;
@@ -136,8 +131,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
MigrationJobInfo result = new MigrationJobInfo(jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
fillJobInfo(result, jobConfigPOJO);
- MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
- result.setTable(jobConfig.getSourceTableName());
+ result.setTable(getJobConfiguration(jobConfigPOJO).getSourceTableName());
return result;
}
@@ -169,11 +163,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
return marshalJobId(jobId);
}
- @Override
- protected String getJobClassName() {
- return MigrationJob.class.getName();
- }
-
@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);
@@ -416,4 +405,14 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
result.setParameter(parameter);
return result;
}
+
+ @Override
+ public JobType getJobType() {
+ return JobType.MIGRATION;
+ }
+
+ @Override
+ protected String getJobClassName() {
+ return MigrationJob.class.getName();
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 8420bbaf6f1..1cc50d54b85 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
/**
* Migration task configuration.
*/
-@Getter
@RequiredArgsConstructor
+@Getter
@ToString
public final class MigrationTaskConfiguration implements PipelineTaskConfiguration {