You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2022/12/28 02:48:54 UTC

[shardingsphere] branch master updated: Refactor pipeline getJobItemProgress return Optional (#23108)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f3946ece36 Refactor pipeline getJobItemProgress return Optional (#23108)
5f3946ece36 is described below

commit 5f3946ece365bb5b034be19f1dc87677e1b0795e
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Wed Dec 28 10:48:47 2022 +0800

    Refactor pipeline getJobItemProgress return Optional (#23108)
    
    * Refactor GovernanceRepositoryAPI.getJobItemProgress return Optional
    
    * Refactor PipelineJobAPI.getJobItemProgress return Optional
    
    * Compatible with empty offset value
---
 .../data/pipeline/cdc/api/impl/CDCJobAPIImpl.java  |  3 ++-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  6 +++--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  2 +-
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  2 +-
 .../core/api/InventoryIncrementalJobAPI.java       |  3 ++-
 .../data/pipeline/core/api/PipelineJobAPI.java     |  2 +-
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 23 ++++++++---------
 .../core/api/impl/GovernanceRepositoryAPIImpl.java |  5 ++--
 .../consistencycheck/ConsistencyCheckJob.java      |  8 ++++--
 .../api/impl/ConsistencyCheckJobAPI.java           | 29 +++++++++++-----------
 .../pipeline/scenario/migration/MigrationJob.java  |  5 ++--
 .../migration/prepare/MigrationJobPreparer.java    |  9 ++++---
 .../handler/cdc/fixture/FixtureCDCJobAPI.java      |  2 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  | 13 +++++-----
 .../core/api/impl/MigrationJobAPITest.java         |  7 +++---
 15 files changed, 63 insertions(+), 56 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
index 7b93604f380..7d7c6b271be 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java
@@ -84,6 +84,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -232,7 +233,7 @@ public final class CDCJobAPIImpl extends AbstractInventoryIncrementalJobAPIImpl
     }
     
     @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+    public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
         // TODO to be implemented
         return null;
     }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 8dbe1a30974..1b7e68c95aa 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -38,6 +38,8 @@ import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTas
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
+import java.util.Optional;
+
 /**
  * CDC job.
  */
@@ -62,10 +64,10 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     protected PipelineJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
         CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
-        return new CDCJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager, importerConnector);
+        return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, importerConnector);
     }
     
     protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index f90bac04580..8a87b8f1497 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -54,7 +54,7 @@ public final class CDCJobPreparer {
      * @param jobItemContext job item context
      */
     public void prepare(final CDCJobItemContext jobItemContext) {
-        if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
+        if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
         }
         if (jobItemContext.isStopping()) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 75fb805a811..b65b5632713 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -55,7 +55,7 @@ public interface GovernanceRepositoryAPI {
      * @param shardingItem sharding item
      * @return job item progress
      */
-    String getJobItemProgress(String jobId, int shardingItem);
+    Optional<String> getJobItemProgress(String jobId, int shardingItem);
     
     /**
      * Get latest check job id.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
index f459fbcd91e..8e47caca94c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/InventoryIncrementalJobAPI.java
@@ -30,6 +30,7 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -60,7 +61,7 @@ public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(PipelineJobConfiguration pipelineJobConfig);
     
     @Override
-    InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+    Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(String jobId, int shardingItem);
     
     /**
      * Get job infos.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 0508ad0aac8..2628da172ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -131,7 +131,7 @@ public interface PipelineJobAPI extends TypedSPI {
      * @param shardingItem sharding item
      * @return job item progress, may be null
      */
-    PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
+    Optional<? extends PipelineJobItemProgress> getJobItemProgress(String jobId, int shardingItem);
     
     /**
      * Update job item status.
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 0398d914d5a..58b91b400d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
-import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.PipelineDataConsistencyChecker;
@@ -96,11 +95,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> {
-            InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
-            if (null != jobItemProgress) {
-                jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
-            }
-            map.put(each, jobItemProgress);
+            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = getJobItemProgress(jobId, each);
+            jobItemProgress.ifPresent(progress -> progress.setActive(!jobConfigPOJO.isDisabled()));
+            map.put(each, jobItemProgress.orElse(null));
         }, LinkedHashMap::putAll);
     }
     
@@ -157,19 +154,19 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     }
     
     @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
-        String data = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
-        return Strings.isNullOrEmpty(data) ? null : jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(data, YamlInventoryIncrementalJobItemProgress.class));
+    public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
+        Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+        return progress.map(s -> jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(s, YamlInventoryIncrementalJobItemProgress.class)));
     }
     
     @Override
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
-        if (null == jobItemProgress) {
+        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
+        if (!jobItemProgress.isPresent()) {
             return;
         }
-        jobItemProgress.setStatus(status);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress)));
+        jobItemProgress.get().setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     @Override
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 37c27a7942e..d28a50a2519 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -61,8 +61,9 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     }
     
     @Override
-    public String getJobItemProgress(final String jobId, final int shardingItem) {
-        return repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem));
+    public Optional<String> getJobItemProgress(final String jobId, final int shardingItem) {
+        String text = repository.getDirectly(PipelineMetaDataNode.getJobOffsetItemPath(jobId, shardingItem));
+        return Strings.isNullOrEmpty(text) ? Optional.empty() : Optional.of(text);
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 406a67b37f8..88a11c31209 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -24,11 +24,14 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.ConsistencyCheckTasksRunner;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
+import java.util.Optional;
+
 /**
  * Consistency check job.
  */
@@ -38,8 +41,9 @@ public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
     @Override
     public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        ConsistencyCheckJobItemProgress jobItemProgress = (ConsistencyCheckJobItemProgress) getJobAPI().getJobItemProgress(jobConfig.getJobId(), shardingContext.getShardingItem());
-        return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress);
+        ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingContext.getShardingItem());
+        return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress.orElse(null));
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b4d781ea6ba..463902206e8 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl;
 
-import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
@@ -30,7 +29,6 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContex
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
@@ -96,8 +94,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
         String parentJobId = param.getJobId();
         Optional<String> latestCheckJobId = repositoryAPI.getLatestCheckJobId(parentJobId);
         if (latestCheckJobId.isPresent()) {
-            PipelineJobItemProgress progress = getJobItemProgress(latestCheckJobId.get(), 0);
-            if (null == progress || JobStatus.FINISHED != progress.getStatus()) {
+            Optional<ConsistencyCheckJobItemProgress> progress = getJobItemProgress(latestCheckJobId.get(), 0);
+            if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) {
                 log.info("check job already exists and status is not FINISHED, progress={}", progress);
                 throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
             }
@@ -141,26 +139,26 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
     }
     
     @Override
-    public ConsistencyCheckJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
-        String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
-        return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobItemProgress.class, true));
+    public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
+        Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
+        return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true)));
     }
     
     @Override
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
-        if (null == jobItemProgress) {
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
+        if (!jobItemProgress.isPresent()) {
             log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
         }
-        jobItemProgress.setStatus(status);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
+        jobItemProgress.get().setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
     }
     
     @Override
     public void startDisabledJob(final String jobId) {
-        PipelineJobItemProgress jobProgress = getJobItemProgress(jobId, 0);
-        if (null != jobProgress && JobStatus.FINISHED == jobProgress.getStatus()) {
+        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = getJobItemProgress(jobId, 0);
+        if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
             log.info("job status is FINISHED, ignore, jobId={}", jobId);
             return;
         }
@@ -223,11 +221,12 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
         Optional<String> latestCheckJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getLatestCheckJobId(parentJobId);
         ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = latestCheckJobId.get();
-        ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+        Optional<ConsistencyCheckJobItemProgress> progressOptional = getJobItemProgress(checkJobId, 0);
         ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
-        if (null == jobItemProgress) {
+        if (!progressOptional.isPresent()) {
             return result;
         }
+        ConsistencyCheckJobItemProgress jobItemProgress = progressOptional.get();
         LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
         if (null == jobItemProgress.getRecordsCount() || null == jobItemProgress.getCheckedRecordsCount()) {
             result.setFinishedPercentage(0);
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 59d14624d0f..3ac7d7559f5 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,6 +37,7 @@ import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigur
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
  * Migration job.
@@ -56,10 +57,10 @@ public final class MigrationJob extends AbstractSimplePipelineJob {
     protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+        Optional<InventoryIncrementalJobItemProgress> initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
-        return new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
+        return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 1edd4529270..ae007aa6a0c 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -49,6 +49,7 @@ import org.apache.shardingsphere.mode.lock.GlobalLockDefinition;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.Optional;
 
 /**
  * Migration job preparer.
@@ -91,7 +92,7 @@ public final class MigrationJobPreparer {
         MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String lockName = "prepare-" + jobConfig.getJobId();
         LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        if (null == jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem())) {
+        if (!jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()).isPresent()) {
             jobAPI.persistJobItemProgress(jobItemContext);
         }
         LockDefinition lockDefinition = new GlobalLockDefinition(lockName);
@@ -99,9 +100,9 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 180000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobConfig.getJobId(), jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis);
             try {
-                InventoryIncrementalJobItemProgress jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-                JobStatus currentStatus = null != jobItemProgress ? jobItemProgress.getStatus() : null;
-                boolean prepareFlag = null == jobItemProgress || JobStatus.PREPARING.equals(currentStatus) || JobStatus.RUNNING.equals(currentStatus)
+                Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+                JobStatus currentStatus = jobItemProgress.map(InventoryIncrementalJobItemProgress::getStatus).orElse(null);
+                boolean prepareFlag = !jobItemProgress.isPresent() || JobStatus.PREPARING.equals(currentStatus) || JobStatus.RUNNING.equals(currentStatus)
                         || JobStatus.PREPARING_FAILURE.equals(currentStatus);
                 if (prepareFlag) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
index 53e2c2c29b0..8617049ab4b 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/fixture/FixtureCDCJobAPI.java
@@ -108,7 +108,7 @@ public final class FixtureCDCJobAPI implements InventoryIncrementalJobAPI, CDCJo
     }
     
     @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+    public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
         return null;
     }
     
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3caab14c0da..a836881ca50 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -29,12 +29,12 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -48,6 +48,7 @@ import java.sql.Types;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -57,7 +58,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -75,8 +75,9 @@ public final class GovernanceRepositoryAPIImplTest {
     public void assertPersistJobProgress() {
         MigrationJobItemContext jobItemContext = mockJobItemContext();
         governanceRepositoryAPI.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), "testValue");
-        String actual = governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-        assertThat(actual, is("testValue"));
+        Optional<String> actual = governanceRepositoryAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is("testValue"));
     }
     
     @Test
@@ -94,8 +95,8 @@ public final class GovernanceRepositoryAPIImplTest {
     public void assertDeleteJob() {
         governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", "");
         governanceRepositoryAPI.deleteJob("1");
-        String actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
-        assertNull(actual);
+        Optional<String> actual = governanceRepositoryAPI.getJobItemProgress("1", 0);
+        assertFalse(actual.isPresent());
     }
     
     @Test
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
index 80d6ddfa275..331cd7c3487 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
@@ -65,7 +65,6 @@ import java.util.Optional;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -235,9 +234,9 @@ public final class MigrationJobAPITest {
         MigrationJobItemContext jobItemContext = PipelineContextUtil.mockMigrationJobItemContext(jobConfig);
         jobAPI.persistJobItemProgress(jobItemContext);
         jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
-        InventoryIncrementalJobItemProgress actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-        assertNotNull(actual);
-        assertThat(actual.getStatus(), is(JobStatus.FINISHED));
+        Optional<InventoryIncrementalJobItemProgress> actual = jobAPI.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
+        assertTrue(actual.isPresent());
+        assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
     }
     
     @Test