You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/11/05 12:39:38 UTC

[shardingsphere] branch master updated: Revise 21929 (#21975)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b150a056460 Revise 21929 (#21975)
b150a056460 is described below

commit b150a056460aee2bae2af16f011c948618ca303d
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sat Nov 5 20:39:31 2022 +0800

    Revise 21929 (#21975)
    
    * Remove unused start_time property
    
    * Use RequiredArgsConstructor to replace constructor; Change fields ordering
    
    * Rename JobItemProgressInfo to JobItemInfo
    
    * Improve code style of JobStatusQueryResultSet classes
    
    * Rename ConsistencyCheckJobProgress to ConsistencyCheckJobItemProgress
    
    * Clean comment
---
 .../ShowMigrationCheckStatusQueryResultSet.java    | 21 ++++------
 .../ShowMigrationJobStatusQueryResultSet.java      | 22 +++++-----
 .../pipeline/api/ConsistencyCheckJobPublicAPI.java |  8 ++--
 .../api/InventoryIncrementalJobPublicAPI.java      |  8 ++--
 .../StandardPipelineDataSourceConfiguration.java   |  1 -
 ...s.java => ConsistencyCheckJobItemProgress.java} |  5 ++-
 ...sInfo.java => ConsistencyCheckJobItemInfo.java} |  6 ++-
 ...o.java => InventoryIncrementalJobItemInfo.java} | 18 +++-----
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 16 ++++----
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  4 +-
 ...va => YamlConsistencyCheckJobItemProgress.java} |  4 +-
 ...amlConsistencyCheckJobItemProgressSwapper.java} | 14 +++----
 .../ConsistencyCheckJobAPIImpl.java                | 48 +++++++++++-----------
 13 files changed, 83 insertions(+), 92 deletions(-)

diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
index fa1afc153ad..7068bdabe12 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusQueryResultSet.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import org.apache.shardingsphere.data.pipeline.api.ConsistencyCheckJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement;
@@ -27,9 +27,8 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -44,15 +43,13 @@ public final class ShowMigrationCheckStatusQueryResultSet implements DatabaseDis
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
         ShowMigrationCheckStatusStatement checkMigrationStatement = (ShowMigrationCheckStatusStatement) sqlStatement;
-        ConsistencyCheckJobProgressInfo progressInfo = JOB_API.getJobProgressInfo(checkMigrationStatement.getJobId());
-        List<Collection<Object>> result = new LinkedList<>();
-        String checkResult = null == progressInfo.getCheckSuccess() ? "" : progressInfo.getCheckSuccess().toString();
-        result.add(Arrays.asList(Optional.ofNullable(progressInfo.getTableNames()).orElse(""), checkResult, String.valueOf(progressInfo.getFinishedPercentage()),
-                progressInfo.getRemainingSeconds(),
-                Optional.ofNullable(progressInfo.getCheckBeginTime()).orElse(""),
-                Optional.ofNullable(progressInfo.getCheckEndTime()).orElse(""),
-                progressInfo.getDurationSeconds(), Optional.ofNullable(progressInfo.getErrorMessage()).orElse("")));
-        data = result.iterator();
+        ConsistencyCheckJobItemInfo info = JOB_API.getJobItemInfo(checkMigrationStatement.getJobId());
+        String checkResult = null == info.getCheckSuccess() ? "" : info.getCheckSuccess().toString();
+        Collection<Object> result = Arrays.asList(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult,
+                String.valueOf(info.getFinishedPercentage()), info.getRemainingSeconds(),
+                Optional.ofNullable(info.getCheckBeginTime()).orElse(""), Optional.ofNullable(info.getCheckEndTime()).orElse(""),
+                info.getDurationSeconds(), Optional.ofNullable(info.getErrorMessage()).orElse(""));
+        data = Collections.singletonList(result).iterator();
     }
     
     @Override
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 9f288faef45..fd22fe35d92 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -46,19 +46,12 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
         long currentTimeMillis = System.currentTimeMillis();
-        List<InventoryIncrementalJobItemProgressInfo> jobProgress = JOB_API.getJobProgressInfos(((ShowMigrationStatusStatement) sqlStatement).getJobId());
-        data = jobProgress.stream().map(each -> {
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = JOB_API.getJobItemInfos(((ShowMigrationStatusStatement) sqlStatement).getJobId());
+        data = jobItemInfos.stream().map(each -> {
             Collection<Object> result = new LinkedList<>();
             result.add(each.getShardingItem());
             InventoryIncrementalJobItemProgress jobItemProgress = each.getJobItemProgress();
-            if (null == jobItemProgress) {
-                result.add("");
-                result.add("");
-                result.add("");
-                result.add("");
-                result.add("");
-                result.add("");
-            } else {
+            if (null != jobItemProgress) {
                 result.add(jobItemProgress.getDataSourceName());
                 result.add(jobItemProgress.getStatus());
                 result.add(jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
@@ -70,6 +63,13 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                     incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
                 }
                 result.add(incrementalIdleSeconds);
+            } else {
+                result.add("");
+                result.add("");
+                result.add("");
+                result.add("");
+                result.add("");
+                result.add("");
             }
             result.add(each.getErrorMessage());
             return result;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
index 2e59d9e3976..039b2a0342c 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ConsistencyCheckJobPublicAPI.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -62,10 +62,10 @@ public interface ConsistencyCheckJobPublicAPI extends PipelineJobPublicAPI, Requ
     void stopByParentJobId(String parentJobId);
     
     /**
-     * Get consistency job progress info.
+     * Get consistency job item info.
      *
      * @param parentJobId parent job id
-     * @return consistency job progress info
+     * @return consistency job item info
      */
-    ConsistencyCheckJobProgressInfo getJobProgressInfo(String parentJobId);
+    ConsistencyCheckJobItemInfo getJobItemInfo(String parentJobId);
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
index d39f7ad683a..da75b1a92f3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/InventoryIncrementalJobPublicAPI.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.api;
 
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -77,12 +77,12 @@ public interface InventoryIncrementalJobPublicAPI extends PipelineJobPublicAPI,
     void commit(String jobId);
     
     /**
-     * Get job progress info list.
+     * Get job infos.
      *
      * @param jobId job id
-     * @return all sharding item progress infos
+     * @return job item infos
      */
-    List<InventoryIncrementalJobItemProgressInfo> getJobProgressInfos(String jobId);
+    List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
     
     /**
      * List all data consistency check algorithms from SPI.
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index cb708db902e..adb3c3c6850 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -71,7 +71,6 @@ public final class StandardPipelineDataSourceConfiguration implements PipelineDa
         for (String each : Arrays.asList("minPoolSize", "minimumIdle")) {
             yamlConfig.put(each, "1");
         }
-        // TODO jdbcUrl not find now, can be deleted after confirmation
         if (yamlConfig.containsKey("jdbcUrl")) {
             yamlConfig.put("url", yamlConfig.get("jdbcUrl"));
             yamlConfig.remove("jdbcUrl");
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
similarity index 88%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
index e3297179140..dcd0195bfa3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/ConsistencyCheckJobItemProgress.java
@@ -23,12 +23,13 @@ import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 
 /**
- * Data consistency check job progress.
+ * Data consistency check job item progress.
  */
+// TODO use final for fields
 @Getter
 @Setter
 @ToString
-public final class ConsistencyCheckJobProgress implements PipelineJobItemProgress {
+public final class ConsistencyCheckJobItemProgress implements PipelineJobItemProgress {
     
     private JobStatus status = JobStatus.RUNNING;
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
similarity index 87%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
index e8027a8d9a6..33252fab5e8 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
@@ -21,11 +21,13 @@ import lombok.Getter;
 import lombok.Setter;
 
 /**
- * Consistency check job progress info.
+ * Consistency check job item info.
  */
+// TODO use final for fields
+// TODO embed ConsistencyCheckJobItemProgress to reduce fields
 @Getter
 @Setter
-public final class ConsistencyCheckJobProgressInfo {
+public final class ConsistencyCheckJobItemInfo {
     
     private String tableNames;
     
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
similarity index 69%
rename from kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java
rename to kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 101fb573098..2521ef3652e 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemProgressInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -18,27 +18,21 @@
 package org.apache.shardingsphere.data.pipeline.api.pojo;
 
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 
 /**
- * Inventory incremental job item progress info.
+ * Inventory incremental job item info.
  */
+@RequiredArgsConstructor
 @Getter
-public final class InventoryIncrementalJobItemProgressInfo {
+public final class InventoryIncrementalJobItemInfo {
     
     private final int shardingItem;
     
-    private final String errorMessage;
+    private final InventoryIncrementalJobItemProgress jobItemProgress;
     
     private final long startTimeMillis;
     
-    private final InventoryIncrementalJobItemProgress jobItemProgress;
-    
-    public InventoryIncrementalJobItemProgressInfo(final int shardingItem, final String errorMessage, final long startTimeMills,
-                                                   final InventoryIncrementalJobItemProgress jobItemProgress) {
-        this.shardingItem = shardingItem;
-        this.errorMessage = errorMessage;
-        this.startTimeMillis = startTimeMills;
-        this.jobItemProgress = jobItemProgress;
-    }
+    private final String errorMessage;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index a9895b1fa97..10290c57c7c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -129,16 +129,16 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     }
     
     @Override
-    public List<InventoryIncrementalJobItemProgressInfo> getJobProgressInfos(final String jobId) {
+    public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
-        List<InventoryIncrementalJobItemProgressInfo> result = new ArrayList<>(jobProgress.size());
+        List<InventoryIncrementalJobItemInfo> result = new ArrayList<>(jobProgress.size());
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
-            InventoryIncrementalJobItemProgressInfo progressInfo = new InventoryIncrementalJobItemProgressInfo(shardingItem, errorMessage, startTimeMillis, entry.getValue());
+            InventoryIncrementalJobItemInfo progressInfo = new InventoryIncrementalJobItemInfo(shardingItem, entry.getValue(), startTimeMillis, errorMessage);
             if (null == entry.getValue()) {
                 continue;
             }
@@ -151,15 +151,15 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
         InventoryIncrementalJobItemContext context = (InventoryIncrementalJobItemContext) jobItemContext;
         InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
-        jobItemProgress.setStatus(jobItemContext.getStatus());
-        jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
-        jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
+        jobItemProgress.setStatus(context.getStatus());
+        jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
+        jobItemProgress.setDataSourceName(context.getDataSourceName());
         jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
         jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
         jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
         String value = YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress));
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), value);
     }
     
     private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final Collection<IncrementalTask> incrementalTasks) {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 4c1b8aae6fc..0daae87d451 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -109,7 +109,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         jobConfigPOJO.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
         String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
         jobConfigPOJO.getProps().setProperty("create_time", createTimeFormat);
-        jobConfigPOJO.getProps().setProperty("start_time", createTimeFormat);
         jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
         return YamlEngine.marshal(jobConfigPOJO);
     }
@@ -125,10 +124,9 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
         jobConfigPOJO.setDisabled(false);
+        jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
         jobConfigPOJO.getProps().remove("stop_time");
         jobConfigPOJO.getProps().remove("stop_time_millis");
-        jobConfigPOJO.getProps().setProperty("start_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
-        jobConfigPOJO.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
         String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
         PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
similarity index 90%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index b5d1940ddfb..4b610032745 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -22,11 +22,11 @@ import lombok.Setter;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Yaml data consistency check job progress.
+ * Yaml data consistency check job item progress.
  */
 @Getter
 @Setter
-public final class YamlConsistencyCheckJobProgress implements YamlConfiguration {
+public final class YamlConsistencyCheckJobItemProgress implements YamlConfiguration {
     
     private String status;
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
similarity index 74%
rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index a3f5dcf20b1..9f4dabdd9ae 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -18,17 +18,17 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
 
 /**
- * YAML data check job progress swapper.
+ * YAML data check job item progress swapper.
  */
-public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobProgress, ConsistencyCheckJobProgress> {
+public final class YamlConsistencyCheckJobItemProgressSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobItemProgress, ConsistencyCheckJobItemProgress> {
     
     @Override
-    public YamlConsistencyCheckJobProgress swapToYamlConfiguration(final ConsistencyCheckJobProgress data) {
-        YamlConsistencyCheckJobProgress result = new YamlConsistencyCheckJobProgress();
+    public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final ConsistencyCheckJobItemProgress data) {
+        YamlConsistencyCheckJobItemProgress result = new YamlConsistencyCheckJobItemProgress();
         result.setStatus(data.getStatus().name());
         result.setRecordsCount(data.getRecordsCount());
         result.setCheckedRecordsCount(data.getCheckedRecordsCount());
@@ -39,8 +39,8 @@ public final class YamlConsistencyCheckJobProgressSwapper implements YamlConfigu
     }
     
     @Override
-    public ConsistencyCheckJobProgress swapToObject(final YamlConsistencyCheckJobProgress yamlConfig) {
-        ConsistencyCheckJobProgress result = new ConsistencyCheckJobProgress();
+    public ConsistencyCheckJobItemProgress swapToObject(final YamlConsistencyCheckJobItemProgress yamlConfig) {
+        ConsistencyCheckJobItemProgress result = new ConsistencyCheckJobItemProgress();
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
         result.setRecordsCount(yamlConfig.getRecordsCount());
         result.setCheckedRecordsCount(yamlConfig.getCheckedRecordsCount());
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 4f4f66f74e3..2ef7f1db3b0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -30,9 +30,9 @@ import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContex
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.ConsistencyCheckJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobProgressInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateConsistencyCheckJobParameter;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -42,8 +42,8 @@ import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJob
 import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgress;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,7 +67,7 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
     
-    private final YamlConsistencyCheckJobProgressSwapper swapper = new YamlConsistencyCheckJobProgressSwapper();
+    private final YamlConsistencyCheckJobItemProgressSwapper swapper = new YamlConsistencyCheckJobItemProgressSwapper();
     
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -112,33 +112,33 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
-        ConsistencyCheckJobItemContext checkJobItemContext = (ConsistencyCheckJobItemContext) jobItemContext;
-        ConsistencyCheckJobProgress jobProgress = new ConsistencyCheckJobProgress();
-        jobProgress.setStatus(jobItemContext.getStatus());
-        jobProgress.setCheckedRecordsCount(checkJobItemContext.getCheckedRecordsCount().get());
-        jobProgress.setRecordsCount(checkJobItemContext.getRecordsCount());
-        jobProgress.setCheckBeginTimeMillis(checkJobItemContext.getCheckBeginTimeMillis());
-        jobProgress.setCheckEndTimeMillis(checkJobItemContext.getCheckEndTimeMillis());
-        jobProgress.setTableNames(String.join(",", checkJobItemContext.getTableNames()));
-        YamlConsistencyCheckJobProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobProgress);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
+        ConsistencyCheckJobItemContext context = (ConsistencyCheckJobItemContext) jobItemContext;
+        ConsistencyCheckJobItemProgress jobItemProgress = new ConsistencyCheckJobItemProgress();
+        jobItemProgress.setStatus(context.getStatus());
+        jobItemProgress.setCheckedRecordsCount(context.getCheckedRecordsCount().get());
+        jobItemProgress.setRecordsCount(context.getRecordsCount());
+        jobItemProgress.setCheckBeginTimeMillis(context.getCheckBeginTimeMillis());
+        jobItemProgress.setCheckEndTimeMillis(context.getCheckEndTimeMillis());
+        jobItemProgress.setTableNames(String.join(",", context.getTableNames()));
+        YamlConsistencyCheckJobItemProgress yamlJobProgress = swapper.swapToYamlConfiguration(jobItemProgress);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(context.getJobId(), context.getShardingItem(), YamlEngine.marshal(yamlJobProgress));
     }
     
     @Override
-    public ConsistencyCheckJobProgress getJobItemProgress(final String jobId, final int shardingItem) {
+    public ConsistencyCheckJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
         String progress = PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, shardingItem);
-        return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobProgress.class, true));
+        return Strings.isNullOrEmpty(progress) ? null : swapper.swapToObject(YamlEngine.unmarshal(progress, YamlConsistencyCheckJobItemProgress.class, true));
     }
     
     @Override
     public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
-        ConsistencyCheckJobProgress jobProgress = getJobItemProgress(jobId, shardingItem);
-        if (null == jobProgress) {
+        ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(jobId, shardingItem);
+        if (null == jobItemProgress) {
             log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
         }
-        jobProgress.setStatus(status);
-        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobProgress)));
+        jobItemProgress.setStatus(status);
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, shardingItem, YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress)));
     }
     
     @Override
@@ -169,12 +169,12 @@ public final class ConsistencyCheckJobAPIImpl extends AbstractPipelineJobAPIImpl
     }
     
     @Override
-    public ConsistencyCheckJobProgressInfo getJobProgressInfo(final String parentJobId) {
+    public ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
         Optional<String> checkLatestJobId = PipelineAPIFactory.getGovernanceRepositoryAPI().getCheckLatestJobId(parentJobId);
         ShardingSpherePreconditions.checkState(checkLatestJobId.isPresent(), () -> new ConsistencyCheckJobNotFoundException(parentJobId));
         String checkJobId = checkLatestJobId.get();
-        ConsistencyCheckJobProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
-        ConsistencyCheckJobProgressInfo result = new ConsistencyCheckJobProgressInfo();
+        ConsistencyCheckJobItemProgress jobItemProgress = getJobItemProgress(checkJobId, 0);
+        ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
         if (null == jobItemProgress) {
             return result;
         }