You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/09/23 12:25:49 UTC

[shardingsphere] branch master updated: For code format (#21147)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1409d7ed70b For code format (#21147)
1409d7ed70b is described below

commit 1409d7ed70b4f43a39a773884fc93aafc5036960
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Fri Sep 23 20:25:40 2022 +0800

    For code format (#21147)
---
 .../MigrationChangedJobConfigurationProcessor.java |  8 ++---
 .../migration/MigrationDataConsistencyChecker.java | 40 ++++++++++------------
 .../pipeline/scenario/migration/MigrationJob.java  |  2 +-
 .../scenario/migration/MigrationJobAPIImpl.java    | 23 ++++++-------
 .../migration/MigrationTaskConfiguration.java      |  2 +-
 5 files changed, 34 insertions(+), 41 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index 8f5b30e7eef..490bcb2afe7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -27,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -38,7 +37,7 @@ import java.util.concurrent.CompletableFuture;
 public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
     @Override
-    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
+    public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             log.info("{} is disabled", jobId);
@@ -61,8 +60,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 break;
             case DELETED:
                 log.info("deleted jobId={}", jobId);
-                MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
-                new MigrationJobPreparer().cleanup(jobConfig);
+                new MigrationJobPreparer().cleanup(YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()));
                 PipelineJobCenter.stop(jobId);
                 break;
             default:
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index a0fd5c4ae5a..6301ea80ba6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -22,7 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
@@ -31,7 +30,6 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
-import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
@@ -71,28 +69,27 @@ import java.util.concurrent.TimeUnit;
  * Data consistency checker for migration job.
  */
 @Slf4j
-public final class MigrationDataConsistencyChecker implements PipelineDataConsistencyChecker {
+public final class MigrationDataConsistencyChecker {
     
     private final MigrationJobConfiguration jobConfig;
     
-    private final String sourceTableName;
-    
-    private final String targetTableName;
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
-    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
-    
-    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final InventoryIncrementalProcessContext processContext) {
+    public MigrationDataConsistencyChecker(final MigrationJobConfiguration jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
-        sourceTableName = jobConfig.getSourceTableName();
-        targetTableName = jobConfig.getTargetTableName();
-        tableNameSchemaNameMapping = new TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
-                new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
-        this.readRateLimitAlgorithm = null != processContext ? processContext.getReadRateLimitAlgorithm() : null;
+        this.readRateLimitAlgorithm = readRateLimitAlgorithm;
+        tableNameSchemaNameMapping = new TableNameSchemaNameMapping(
+                TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), jobConfig.getTargetTableName()))));
     }
     
-    @Override
+    /**
+     * Check data consistency.
+     *
+     * @param calculator data consistency calculate algorithm
+     * @return checked result, key is logic table name, value is check result.
+     */
     public Map<String, DataConsistencyCheckResult> check(final DataConsistencyCalculateAlgorithm calculator) {
         Map<String, DataConsistencyCountCheckResult> countCheckResult = checkCount();
         Map<String, DataConsistencyContentCheckResult> contentCheckResult = countCheckResult.values().stream().allMatch(DataConsistencyCountCheckResult::isMatched)
@@ -110,12 +107,11 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(), jobConfig.getSource().getParameter());
         PipelineDataSourceConfiguration targetDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(), jobConfig.getTarget().getParameter());
-        // TODO migration might support multiple tables
         Map<String, DataConsistencyCountCheckResult> result = new LinkedHashMap<>(1, 1);
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            result.put(sourceTableName, checkCount(sourceDataSource, targetDataSource, executor));
+            result.put(jobConfig.getSourceTableName(), checkCount(sourceDataSource, targetDataSource, executor));
             return result;
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
@@ -126,8 +122,8 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
     }
     
     private DataConsistencyCountCheckResult checkCount(final PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper targetDataSource, final ThreadPoolExecutor executor) {
-        Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, sourceTableName, sourceDataSource.getDatabaseType()));
-        Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, targetTableName, targetDataSource.getDatabaseType()));
+        Future<Long> sourceFuture = executor.submit(() -> count(sourceDataSource, jobConfig.getSourceTableName(), sourceDataSource.getDatabaseType()));
+        Future<Long> targetFuture = executor.submit(() -> count(targetDataSource, jobConfig.getTargetTableName(), targetDataSource.getDatabaseType()));
         long sourceCount;
         long targetCount;
         try {
@@ -181,14 +177,14 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
             String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getType();
             StandardPipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
-            for (String each : Collections.singletonList(sourceTableName)) {
+            for (String each : Collections.singleton(jobConfig.getSourceTableName())) {
                 PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each), each);
                 ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(each));
                 Collection<String> columnNames = tableMetaData.getColumnNames();
                 PipelineColumnMetaData uniqueKey = jobConfig.getUniqueKeyColumn();
                 DataConsistencyCalculateParameter sourceParameter = buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, sourceDatabaseType, targetDatabaseType, uniqueKey);
-                DataConsistencyCalculateParameter targetParameter = buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, columnNames, targetDatabaseType, sourceDatabaseType,
-                        uniqueKey);
+                DataConsistencyCalculateParameter targetParameter = buildParameter(
+                        targetDataSource, tableNameSchemaNameMapping, jobConfig.getTargetTableName(), columnNames, targetDatabaseType, sourceDatabaseType, uniqueKey);
                 Iterator<Object> sourceCalculatedResults = calculator.calculate(sourceParameter).iterator();
                 Iterator<Object> targetCalculatedResults = calculator.calculate(targetParameter).iterator();
                 boolean contentMatched = true;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 817512e1d17..fcfa8dbf962 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -43,8 +43,8 @@ import java.sql.SQLException;
 /**
  * Migration job.
  */
-@Slf4j
 @RequiredArgsConstructor
+@Slf4j
 public final class MigrationJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
     
     private final MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 027c13f1a40..034c7555060 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -112,11 +112,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
     
     private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService();
     
-    @Override
-    public JobType getJobType() {
-        return JobType.MIGRATION;
-    }
-    
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         MigrationJobId jobId = (MigrationJobId) pipelineJobId;
@@ -136,8 +131,7 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         MigrationJobInfo result = new MigrationJobInfo(jobId);
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         fillJobInfo(result, jobConfigPOJO);
-        MigrationJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
-        result.setTable(jobConfig.getSourceTableName());
+        result.setTable(getJobConfiguration(jobConfigPOJO).getSourceTableName());
         return result;
     }
     
@@ -169,11 +163,6 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         return marshalJobId(jobId);
     }
     
-    @Override
-    protected String getJobClassName() {
-        return MigrationJob.class.getName();
-    }
-    
     @Override
     protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
         return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);
@@ -416,4 +405,14 @@ public final class MigrationJobAPIImpl extends AbstractInventoryIncrementalJobAP
         result.setParameter(parameter);
         return result;
     }
+    
+    @Override
+    public JobType getJobType() {
+        return JobType.MIGRATION;
+    }
+    
+    @Override
+    protected String getJobClassName() {
+        return MigrationJob.class.getName();
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
index 8420bbaf6f1..1cc50d54b85 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationTaskConfiguration.java
@@ -28,8 +28,8 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 /**
  * Migration task configuration.
  */
-@Getter
 @RequiredArgsConstructor
+@Getter
 @ToString
 public final class MigrationTaskConfiguration implements PipelineTaskConfiguration {