You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/06 04:55:39 UTC

[shardingsphere] branch master updated: Revise job progress persistence (#19909)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bdefa673d8 Revise job progress persistence (#19909)
4bdefa673d8 is described below

commit 4bdefa673d88a362f52fc02550841d58f8b73fe0
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sat Aug 6 12:55:34 2022 +0800

    Revise job progress persistence (#19909)
---
 .../listener/PipelineJobProgressListener.java}     | 24 +++--------
 .../pipeline/core/importer/AbstractImporter.java   | 10 ++---
 .../DefaultPipelineJobProgressListener.java}       | 24 ++++-------
 .../PipelineJobProgressPersistContext.java}        |  7 +++-
 .../PipelineJobProgressPersistService.java}        | 48 +++++++++++-----------
 .../data/pipeline/core/task/IncrementalTask.java   | 11 ++---
 .../data/pipeline/core/task/InventoryTask.java     |  6 +--
 .../scenario/rulealtered/RuleAlteredJob.java       |  5 ++-
 .../rulealtered/RuleAlteredJobPreparer.java        |  6 +--
 .../rulealtered/prepare/InventoryTaskSplitter.java |  6 +--
 .../scaling/core/job/importer/ImporterFactory.java | 10 ++---
 .../pipeline/mysql/importer/MySQLImporter.java     |  6 +--
 .../opengauss/importer/OpenGaussImporter.java      |  6 +--
 .../postgresql/importer/PostgreSQLImporter.java    |  6 +--
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  6 +--
 .../pipeline/core/fixture/FixtureImporter.java     |  4 +-
 ...ava => FixturePipelineJobProgressListener.java} | 16 ++------
 .../core/importer/AbstractImporterTest.java        |  4 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  4 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |  6 +--
 20 files changed, 93 insertions(+), 122 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
similarity index 67%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
index 09260f40aff..4ca18491ef8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
@@ -15,29 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job.persist;
+package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
 /**
- * Job persist callback.
+ * Pipeline job progress listener.
  */
-public interface PipelineJobPersistCallback {
+public interface PipelineJobProgressListener {
     
     /**
-     * Get job id.
-     *
-     * @return job id
+     * Emit on progress updated.
      */
-    String getJobId();
-    
-    /**
-     * Get sharding item.
-     *
-     * @return sharding item
-     */
-    int getShardingItem();
-    
-    /**
-     * Push persist event.
-     */
-    void pushPersistEvent();
+    void onProgressUpdated();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index 97f914398cd..f0e3081376d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -63,15 +63,15 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
     
     private final PipelineChannel channel;
     
-    private final PipelineJobPersistCallback pipelineJobPersistCallback;
+    private final PipelineJobProgressListener jobProgressListener;
     
     protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                               final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                               final PipelineJobProgressListener jobProgressListener) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
         this.channel = channel;
         pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
-        this.pipelineJobPersistCallback = pipelineJobPersistCallback;
+        this.jobProgressListener = jobProgressListener;
     }
     
     @Override
@@ -92,7 +92,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
                 rowCount += records.size();
                 flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
                 channel.ack(records);
-                pipelineJobPersistCallback.pushPersistEvent();
+                jobProgressListener.onProgressUpdated();
                 if (0 == round % 50) {
                     log.info("importer write, round={}, rowCount={}", round, rowCount);
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
similarity index 59%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
index 08b2b1336a6..97b55c4e48a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
@@ -15,34 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.persist;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
 
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPersistService;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
 /**
- * Async job process persist callback.
+ * Default pipeline job progress listener implementation.
  */
 @RequiredArgsConstructor
-public final class AsyncPipelineJobPersistCallback implements PipelineJobPersistCallback {
+public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
     
     private final String jobId;
     
     private final int shardingItem;
     
     @Override
-    public String getJobId() {
-        return jobId;
-    }
-    
-    @Override
-    public int getShardingItem() {
-        return shardingItem;
-    }
-    
-    @Override
-    public void pushPersistEvent() {
-        RuleAlteredJobPersistService.triggerPersist(jobId, shardingItem);
+    public void onProgressUpdated() {
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
similarity index 87%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
index ea590ea94f0..b62d0b35ea9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job.persist;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -23,9 +23,12 @@ import lombok.RequiredArgsConstructor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Pipeline job progress persist context.
+ */
 @Getter
 @RequiredArgsConstructor
-public final class PipelineJobPersistContext {
+public final class PipelineJobProgressPersistContext {
     
     private final String jobId;
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
similarity index 66%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index e7bc4d46a32..8380f08f906 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistContext;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobScheduler;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
 import java.util.Collections;
@@ -33,17 +34,16 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Rule altered job persist service.
+ * Pipeline job progress persist service.
  */
-
 @Slf4j
-public final class RuleAlteredJobPersistService {
+public final class PipelineJobProgressPersistService {
     
-    private static final Map<String, Map<Integer, PipelineJobPersistContext>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    private static final Map<String, Map<Integer, PipelineJobProgressPersistContext>> JOB_PROGRESS_PERSIST_MAP = new ConcurrentHashMap<>();
     
     private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
     
-    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
     
     private static final long DELAY_SECONDS = 1;
     
@@ -52,43 +52,43 @@ public final class RuleAlteredJobPersistService {
     }
     
     /**
-     * Remove job schedule parameter by job id.
+     * Remove job progress persist context.
      *
      * @param jobId job id
      */
-    public static void removeJobPersistParameter(final String jobId) {
-        log.info("Remove job persist, job id: {}", jobId);
-        JOB_PERSIST_MAP.remove(jobId);
+    public static void removeJobProgressPersistContext(final String jobId) {
+        log.info("Remove job progress persist context, jobId={}", jobId);
+        JOB_PROGRESS_PERSIST_MAP.remove(jobId);
     }
     
     /**
-     * Add job schedule parameter.
+     * Add job progress persist context.
      *
      * @param jobId job id
      * @param shardingItem sharding item
      */
-    public static void addJobPersistParameter(final String jobId, final int shardingItem) {
-        log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
-        JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobPersistContext(jobId, shardingItem));
+    public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
+        log.info("Add job progress persist context, jobId={}, shardingItem={}", jobId, shardingItem);
+        JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
     }
     
     /**
-     * Persist job process, may not be implemented immediately, depending on persist interval.
+     * Notify persist.
      *
      * @param jobId job id
      * @param shardingItem sharding item
      */
-    public static void triggerPersist(final String jobId, final int shardingItem) {
-        Map<Integer, PipelineJobPersistContext> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
-        PipelineJobPersistContext parameter = intervalParamMap.get(shardingItem);
-        if (null == parameter) {
-            log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+    public static void notifyPersist(final String jobId, final int shardingItem) {
+        Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+        PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
+        if (null == persistContext) {
+            log.debug("persistContext is null, jobId={}, shardingItem={}", jobId, shardingItem);
             return;
         }
-        parameter.getHasNewEvents().set(true);
+        persistContext.getHasNewEvents().set(true);
     }
     
-    private static void persist(final String jobId, final int shardingItem, final PipelineJobPersistContext persistContext) {
+    private static void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
         Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
         if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
                 && !persistContext.getHasNewEvents().get()) {
@@ -116,7 +116,7 @@ public final class RuleAlteredJobPersistService {
         
         @Override
         public void run() {
-            for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry : JOB_PERSIST_MAP.entrySet()) {
+            for (Entry<String, Map<Integer, PipelineJobProgressPersistContext>> entry : JOB_PROGRESS_PERSIST_MAP.entrySet()) {
                 entry.getValue().forEach((shardingItem, persistContext) -> {
                     persist(entry.getKey(), shardingItem, persistContext);
                 });
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 55a8ab607a1..abafe1c9600 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -68,14 +68,15 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     
     public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
                            final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
-                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine,
+                           final PipelineJobProgressListener jobProgressListener) {
         this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
         taskId = dumperConfig.getDataSourceName();
         IngestPosition<?> position = dumperConfig.getPosition();
         progress = createIncrementalTaskProgress(position);
         channel = createChannel(concurrency, pipelineChannelCreator, progress);
         dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
-        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, jobProgressListener);
     }
     
     private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position) {
@@ -93,10 +94,10 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     }
     
     private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                                                 final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                                                 final PipelineJobProgressListener jobProgressListener) {
         Collection<Importer> result = new LinkedList<>();
         for (int i = 0; i < concurrency; i++) {
-            result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback));
+            result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, jobProgressListener));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index b04caf2da2b..85a1f9cdfde 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -68,12 +68,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
                          final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
                          final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
-                         final ExecuteEngine importerExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                         final ExecuteEngine importerExecuteEngine, final PipelineJobProgressListener jobProgressListener) {
         this.importerExecuteEngine = importerExecuteEngine;
         taskId = generateTaskId(inventoryDumperConfig);
         channel = createChannel(pipelineChannelCreator);
         dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
-        importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+        importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, jobProgressListener);
         position = inventoryDumperConfig.getPosition();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index f2f763efd69..ede339c7606 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -80,7 +81,7 @@ public final class RuleAlteredJob implements SimpleJob, PipelineJob {
         RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
         jobScheduler.start();
         jobSchedulerMap.put(shardingItem, jobScheduler);
-        RuleAlteredJobPersistService.addJobPersistParameter(jobId, shardingItem);
+        PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, shardingItem);
     }
     
     /**
@@ -101,6 +102,6 @@ public final class RuleAlteredJob implements SimpleJob, PipelineJob {
             each.stop();
         }
         jobSchedulerMap.clear();
-        RuleAlteredJobPersistService.removeJobPersistParameter(jobId);
+        PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 7c84cf39468..a7e32400aaa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
@@ -53,7 +54,6 @@ import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
@@ -192,9 +192,9 @@ public final class RuleAlteredJobPreparer {
         PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
         taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
-        AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
+        DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobContext.getJobId(), jobContext.getShardingItem());
         IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
-                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, persistCallback);
+                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobProgressListener);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index fb4609f1dec..7d681d90413 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
@@ -46,7 +47,6 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -77,9 +77,9 @@ public final class InventoryTaskSplitter {
         DataSource dataSource = jobContext.getSourceDataSource();
         PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
         ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
-        AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
+        DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobContext.getJobId(), jobContext.getShardingItem());
         for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
-            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, persistCallback));
+            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, jobProgressListener));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
index 16999111f08..fbe1bfeb305 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -42,16 +42,16 @@ public final class ImporterFactory {
      * @param importerConfig importer configuration
      * @param dataSourceManager data source manager
      * @param channel channel
-     * @param pipelineJobPersistCallback job persist callback
+     * @param jobProgressListener job progress listener
      * @return importer
      */
     @SneakyThrows(ReflectiveOperationException.class)
     public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                                          final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                                          final PipelineJobProgressListener jobProgressListener) {
         String databaseType = importerConfig.getDataSourceConfig().getDatabaseType().getType();
         ScalingEntry scalingEntry = ScalingEntryFactory.getInstance(databaseType);
         Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class,
-                PipelineJobPersistCallback.class);
-        return constructor.newInstance(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+                PipelineJobProgressListener.class);
+        return constructor.newInstance(importerConfig, dataSourceManager, channel, jobProgressListener);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 002f0869450..09af98ea2fe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 
@@ -29,8 +29,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 public final class MySQLImporter extends AbstractImporter {
     
     public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                         final PipelineJobPersistCallback pipelineJobPersistCallback) {
-        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+                         final PipelineJobProgressListener jobProgressListener) {
+        super(importerConfig, dataSourceManager, channel, jobProgressListener);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index c864e5c04a4..831c3510b32 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 public final class OpenGaussImporter extends AbstractImporter {
     
     public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                             final PipelineJobPersistCallback pipelineJobPersistCallback) {
-        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+                             final PipelineJobProgressListener jobProgressListener) {
+        super(importerConfig, dataSourceManager, channel, jobProgressListener);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 26c1bac7aa6..571915d5c16 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -30,8 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 public final class PostgreSQLImporter extends AbstractImporter {
     
     public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                              final PipelineJobPersistCallback pipelineJobPersistCallback) {
-        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
+                              final PipelineJobProgressListener jobProgressListener) {
+        super(importerConfig, dataSourceManager, channel, jobProgressListener);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 7a2100d2e12..83615933662 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -160,7 +160,7 @@ public final class GovernanceRepositoryAPIImplTest {
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
@@ -168,6 +168,6 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index 4932eb50cea..fdbd30b03dc 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -19,14 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 
 public final class FixtureImporter implements Importer {
     
     public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
-                           final PipelineJobPersistCallback pipelineJobPersistCallback) {
+                           final PipelineJobProgressListener jobProgressListener) {
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
similarity index 70%
rename from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
rename to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
index 00e1a3f30d8..8a2a7a11702 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
@@ -17,21 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 
-public final class FixturePipelineJobPersistCallback implements PipelineJobPersistCallback {
+public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
     
     @Override
-    public String getJobId() {
-        return null;
-    }
-    
-    @Override
-    public int getShardingItem() {
-        return 0;
-    }
-    
-    @Override
-    public void pushPersistEvent() {
+    public void onProgressUpdated() {
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 22b9c7c2b70..92d5a4632b1 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -30,7 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,7 +82,7 @@ public final class AbstractImporterTest {
     
     @Before
     public void setUp() throws SQLException {
-        jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobPersistCallback()) {
+        jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobProgressListener()) {
             
             @Override
             protected String getSchemaName(final String logicTableName) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index af11ca297ec..e81a8913b09 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -55,7 +55,7 @@ public final class IncrementalTaskTest {
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index dc14ac4d84c..afde39ffba6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -73,7 +73,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
-                        DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
+                        DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
             inventoryTask.start();
         }
     }
@@ -88,7 +88,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
-                        new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
+                        new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobProgressListener())) {
             inventoryTask.start();
             assertThat(inventoryTask.getProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
         }