You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/10/24 11:05:15 UTC

[shardingsphere] branch master updated: Improve consistency check job item context (#21719)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aa7d8a9845 Improve consistency check job item context (#21719)
0aa7d8a9845 is described below

commit 0aa7d8a98454d487088e7c6d20f9584e46f320aa
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Oct 24 19:05:08 2022 +0800

    Improve consistency check job item context (#21719)
    
    * Improve consistency check job item context
    
    * Fix codestyle
    
    * Fix codestyle
---
 ...SingleTableInventoryDataConsistencyChecker.java | 29 +++++-----------------
 .../ConsistencyCheckJobAPIImpl.java                |  2 +-
 .../ConsistencyCheckJobItemContext.java            |  5 ++--
 .../migration/MigrationDataConsistencyChecker.java | 17 ++++++++++++-
 .../core/api/impl/MigrationJobAPIImplTest.java     |  9 ++++++-
 5 files changed, 34 insertions(+), 28 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
index a9eaa29f2a2..33bbd31266c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/SingleTableInventoryDataConsistencyChecker.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.check.consistency;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculatedResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
@@ -28,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -36,8 +34,6 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -46,9 +42,7 @@ import org.apache.shardingsphere.infra.util.exception.external.sql.type.wrapper.
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
@@ -80,7 +74,7 @@ public final class SingleTableInventoryDataConsistencyChecker {
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
-    private final ConsistencyCheckJobItemContext consistencyCheckJobItemContext;
+    private final PipelineJobProgressListener jobProgressListener;
     
     /**
      * Data consistency check.
@@ -92,7 +86,7 @@ public final class SingleTableInventoryDataConsistencyChecker {
         ThreadFactory threadFactory = ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(jobId) + "-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
-            return check(calculateAlgorithm, executor, consistencyCheckJobItemContext);
+            return check(calculateAlgorithm, executor, jobProgressListener);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
@@ -100,20 +94,12 @@ public final class SingleTableInventoryDataConsistencyChecker {
     }
     
     private DataConsistencyCheckResult check(final DataConsistencyCalculateAlgorithm calculateAlgorithm, final ThreadPoolExecutor executor,
-                                             final ConsistencyCheckJobItemContext checkJobItemContext) {
+                                             final PipelineJobProgressListener jobProgressListener) {
         String sourceDatabaseType = sourceDataSource.getDatabaseType().getType();
         String targetDatabaseType = targetDataSource.getDatabaseType().getType();
         String sourceTableName = sourceTable.getTableName().getOriginal();
         String schemaName = sourceTable.getSchemaName().getOriginal();
         PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(schemaName, sourceTableName);
-        if (null != checkJobItemContext) {
-            checkJobItemContext.setTableNames(Collections.singletonList(sourceTableName));
-            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
-            Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
-            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
-            checkJobItemContext.setRecordsCount(recordsCount);
-            log.info("check, get records count: {}", recordsCount);
-        }
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new PipelineTableDataConsistencyCheckLoadingFailedException(sourceTableName));
         Collection<String> columnNames = tableMetaData.getColumnNames();
         DataConsistencyCalculateParameter sourceParameter = buildParameter(
@@ -140,13 +126,10 @@ public final class SingleTableInventoryDataConsistencyChecker {
                 log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKey={}", jobId, sourceTable, targetTable, uniqueKey);
                 break;
             }
-            if (null != checkJobItemContext) {
-                checkJobItemContext.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
+            if (null != jobProgressListener) {
+                jobProgressListener.onProgressUpdated(new PipelineJobProgressUpdatedParameter(sourceCalculatedResult.getRecordsCount()));
             }
         }
-        if (null != checkJobItemContext) {
-            checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
-        }
         return new DataConsistencyCheckResult(new DataConsistencyCountCheckResult(sourceRecordsCount, targetRecordsCount), new DataConsistencyContentCheckResult(contentMatched));
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 6a3e7dc8d72..6c7dad4eca7 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -119,7 +119,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
         jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
         jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
         jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
-        jobProgress.setTableNames(null == checkJobItemContext.getTableNames() ? null : String.join(",", checkJobItemContext.getTableNames()));
+        jobProgress.setTableNames(String.join(",", checkJobItemContext.getTableNames()));
         YamlConsistencyCheckJobProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobProgress);
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
index c35ef0adae6..3a06075341c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobItemContext.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.Pipelin
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -47,9 +48,9 @@ public final class ConsistencyCheckJobItemContext implements PipelineJobItemCont
     
     private volatile JobStatus status;
     
-    private Collection<String> tableNames;
+    private final Collection<String> tableNames = new CopyOnWriteArraySet<>();
     
-    private volatile Long recordsCount;
+    private volatile long recordsCount;
     
     private final AtomicLong checkedRecordsCount = new AtomicLong(0);
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 88e5cc1fa59..fc4e47e466e 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -17,12 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.InventoryIncrementalJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
@@ -31,6 +35,7 @@ import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTabl
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
@@ -43,10 +48,12 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Data consistency checker for migration job.
  */
+@Slf4j
 public final class MigrationDataConsistencyChecker implements PipelineDataConsistencyChecker {
     
     private final MigrationJobConfiguration jobConfig;
@@ -76,10 +83,18 @@ public final class MigrationDataConsistencyChecker implements PipelineDataConsis
         try (
                 PipelineDataSourceWrapper sourceDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getSource());
                 PipelineDataSourceWrapper targetDataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget())) {
+            String jobId = jobConfig.getJobId();
+            InventoryIncrementalJobPublicAPI inventoryIncrementalJobPublicAPI = PipelineJobPublicAPIFactory.getInventoryIncrementalJobPublicAPI(PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+            Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = inventoryIncrementalJobPublicAPI.getJobProgress(jobId);
+            long recordsCount = jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+            checkJobItemContext.setRecordsCount(recordsCount);
+            checkJobItemContext.getTableNames().add(jobConfig.getSourceTableName());
+            log.info("consistency check, get records count: {}", recordsCount);
             PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(sourceDataSource);
-            SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobConfig.getJobId(), sourceDataSource, targetDataSource,
+            SingleTableInventoryDataConsistencyChecker singleTableInventoryChecker = new SingleTableInventoryDataConsistencyChecker(jobId, sourceDataSource, targetDataSource,
                     sourceTable, targetTable, jobConfig.getUniqueKeyColumn(), metaDataLoader, readRateLimitAlgorithm, checkJobItemContext);
             result.put(sourceTable.getTableName().getOriginal(), singleTableInventoryChecker.check(calculateAlgorithm));
+            checkJobItemContext.setCheckEndTimeMillis(System.currentTimeMillis());
         } catch (final SQLException ex) {
             throw new SQLWrapperException(ex);
         }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 2e60df2aa7d..436a9189a3d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -21,6 +21,7 @@ import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -32,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobItemContext;
@@ -64,6 +66,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public final class MigrationJobAPIImplTest {
@@ -147,7 +151,10 @@ public final class MigrationJobAPIImplTest {
         Optional<String> jobId = jobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         DataConsistencyCalculateAlgorithm calculateAlgorithm = jobAPI.buildDataConsistencyCalculateAlgorithm(jobConfig, "FIXTURE", null);
-        Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, null);
+        ConsistencyCheckJobConfiguration checkJobConfig = mock(ConsistencyCheckJobConfiguration.class);
+        when(checkJobConfig.getJobId()).thenReturn(jobConfig.getJobId() + "1");
+        ConsistencyCheckJobItemContext checkJobItemContext = new ConsistencyCheckJobItemContext(checkJobConfig, 0, JobStatus.RUNNING);
+        Map<String, DataConsistencyCheckResult> checkResultMap = jobAPI.dataConsistencyCheck(jobConfig, calculateAlgorithm, checkJobItemContext);
         assertThat(checkResultMap.size(), is(1));
         assertTrue(checkResultMap.get("t_order").getCountCheckResult().isMatched());
         assertThat(checkResultMap.get("t_order").getCountCheckResult().getTargetRecordsCount(), is(2L));