You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/08/01 03:01:30 UTC

[shardingsphere] branch master updated: Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1633afeefe7 Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)
1633afeefe7 is described below

commit 1633afeefe71419b052911baa357021c4f3e77d8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Aug 1 11:01:23 2022 +0800

    Extract job progress persistence from RuleAlteredJobSchedulerCenter (#19695)
---
 .../job/persist/PipelineJobPersistCallback.java    |  41 +++----
 .../api/job/persist/PipelineJobPersistContext.java |  30 ++----
 .../pipeline/core/importer/AbstractImporter.java   |   8 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  10 +-
 .../data/pipeline/core/task/InventoryTask.java     |   5 +-
 .../rulealtered/RuleAlteredJobPersistService.java  | 119 +++++++++++++++++++++
 .../rulealtered/RuleAlteredJobPreparer.java        |  19 ++--
 .../rulealtered/RuleAlteredJobSchedulerCenter.java |  37 ++-----
 .../rulealtered/prepare/InventoryTaskSplitter.java |   4 +-
 .../scaling/core/job/importer/ImporterFactory.java |  10 +-
 .../persist/AsyncPipelineJobPersistCallback.java   |  30 +++---
 .../pipeline/mysql/importer/MySQLImporter.java     |   6 +-
 .../opengauss/importer/OpenGaussImporter.java      |   6 +-
 .../postgresql/importer/PostgreSQLImporter.java    |   6 +-
 .../data/pipeline/cases/base/BaseITCase.java       |  11 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |   5 +-
 .../pipeline/core/fixture/FixtureImporter.java     |   4 +-
 ...java => FixturePipelineJobPersistCallback.java} |  19 ++--
 .../core/importer/AbstractImporterTest.java        |   3 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |   3 +-
 .../data/pipeline/core/task/InventoryTaskTest.java |   5 +-
 21 files changed, 253 insertions(+), 128 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
similarity index 51%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
index 214ad50cccf..09260f40aff 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistCallback.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-
-public final class FixtureImporter implements Importer {
-    
-    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-    }
+/**
+ * Job persist callback.
+ */
+public interface PipelineJobPersistCallback {
     
-    @Override
-    public void start() {
-    }
+    /**
+     * Get job id.
+     *
+     * @return job id
+     */
+    String getJobId();
     
-    @Override
-    public void stop() {
-    }
+    /**
+     * Get sharding item.
+     *
+     * @return sharding item
+     */
+    int getShardingItem();
     
-    @Override
-    public void run() {
-        start();
-    }
+    /**
+     * Push persist event.
+     */
+    void pushPersistEvent();
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
similarity index 51%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
index 214ad50cccf..2f0b185e097 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/PipelineJobPersistContext.java
@@ -15,28 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
-public final class FixtureImporter implements Importer {
-    
-    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-    }
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Getter
+@RequiredArgsConstructor
+public final class PipelineJobPersistContext {
     
-    @Override
-    public void start() {
-    }
+    private final String jobId;
     
-    @Override
-    public void stop() {
-    }
+    private final int shardingItem;
     
-    @Override
-    public void run() {
-        start();
-    }
+    private final AtomicBoolean alreadyPersisted = new AtomicBoolean(false);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
index c606d4ef103..97f914398cd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporter.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
@@ -62,11 +63,15 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
     
     private final PipelineChannel channel;
     
-    protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    private final PipelineJobPersistCallback pipelineJobPersistCallback;
+    
+    protected AbstractImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                               final PipelineJobPersistCallback pipelineJobPersistCallback) {
         this.importerConfig = importerConfig;
         this.dataSourceManager = dataSourceManager;
         this.channel = channel;
         pipelineSqlBuilder = PipelineSQLBuilderFactory.getInstance(importerConfig.getDataSourceConfig().getDatabaseType().getType());
+        this.pipelineJobPersistCallback = pipelineJobPersistCallback;
     }
     
     @Override
@@ -87,6 +92,7 @@ public abstract class AbstractImporter extends AbstractLifecycleExecutor impleme
                 rowCount += records.size();
                 flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
                 channel.ack(records);
+                pipelineJobPersistCallback.pushPersistEvent();
                 if (0 == round % 50) {
                     log.info("importer write, round={}, rowCount={}", round, rowCount);
                 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 568c0620242..161ed5fdfbb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -67,7 +68,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
     
     public IncrementalTask(final int concurrency, final DumperConfiguration dumperConfig, final ImporterConfiguration importerConfig,
                            final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
-                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine) {
+                           final PipelineTableMetaDataLoader sourceMetaDataLoader, final ExecuteEngine incrementalDumperExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
         this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
         taskId = dumperConfig.getDataSourceName();
         progress = new IncrementalTaskProgress();
@@ -75,7 +76,7 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
         progress.setPosition(position);
         channel = createChannel(concurrency, pipelineChannelCreator, progress);
         dumper = DumperFactory.createIncrementalDumper(dumperConfig, position, channel, sourceMetaDataLoader);
-        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel);
+        importers = createImporters(concurrency, importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
     }
     
     @Override
@@ -86,10 +87,11 @@ public final class IncrementalTask extends AbstractLifecycleExecutor implements
         waitForResult(future);
     }
     
-    private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    private Collection<Importer> createImporters(final int concurrency, final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                                                 final PipelineJobPersistCallback pipelineJobPersistCallback) {
         Collection<Importer> result = new LinkedList<>();
         for (int i = 0; i < concurrency; i++) {
-            result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel));
+            result.add(ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index ea115a4b734..b04caf2da2b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
@@ -67,12 +68,12 @@ public final class InventoryTask extends AbstractLifecycleExecutor implements Pi
     public InventoryTask(final InventoryDumperConfiguration inventoryDumperConfig, final ImporterConfiguration importerConfig,
                          final PipelineChannelCreator pipelineChannelCreator, final PipelineDataSourceManager dataSourceManager,
                          final DataSource sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader,
-                         final ExecuteEngine importerExecuteEngine) {
+                         final ExecuteEngine importerExecuteEngine, final PipelineJobPersistCallback pipelineJobPersistCallback) {
         this.importerExecuteEngine = importerExecuteEngine;
         taskId = generateTaskId(inventoryDumperConfig);
         channel = createChannel(pipelineChannelCreator);
         dumper = DumperFactory.createInventoryDumper(inventoryDumperConfig, channel, sourceDataSource, sourceMetaDataLoader);
-        importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel);
+        importer = ImporterFactory.createImporter(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
         position = inventoryDumperConfig.getPosition();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
new file mode 100644
index 00000000000..dcda0743b23
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistContext;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final Map<String, Map<Integer, PipelineJobPersistContext>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 1, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Remove job schedule parameter by job id.
+     *
+     * @param jobId job id
+     */
+    public static void removeJobPersistParameter(final String jobId) {
+        log.info("Remove job persist, job id: {}", jobId);
+        JOB_PERSIST_MAP.remove(jobId);
+    }
+    
+    /**
+     * Add job schedule parameter.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public static void addJobPersistParameter(final String jobId, final int shardingItem) {
+        log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
+        JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobPersistContext(jobId, shardingItem));
+    }
+    
+    /**
+     * Persist job process, may not be implemented immediately, depending on persist interval.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public static void triggerPersist(final String jobId, final int shardingItem) {
+        Map<Integer, PipelineJobPersistContext> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+        PipelineJobPersistContext parameter = intervalParamMap.get(shardingItem);
+        if (null == parameter) {
+            log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        parameter.getAlreadyPersisted().compareAndSet(true, false);
+    }
+    
+    private static void persist(final String jobId, final int shardingItem, final long persistTimeMillis, final PipelineJobPersistContext param) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = RuleAlteredJobSchedulerCenter.getJobSchedulerMap(jobId);
+        RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
+        if (null == scheduler) {
+            log.warn("job schedule not exists, job id: {}, sharding item: {}", jobId, shardingItem);
+            return;
+        }
+        log.info("execute persist, job id={}, sharding item={}, persistTimeMillis={}", jobId, shardingItem, persistTimeMillis);
+        REPOSITORY_API.persistJobProgress(scheduler.getJobContext());
+        param.getAlreadyPersisted().set(true);
+    }
+    
+    private static final class PersistJobContextRunnable implements Runnable {
+        
+        @Override
+        public void run() {
+            long currentTimeMillis = System.currentTimeMillis();
+            for (Entry<String, Map<Integer, PipelineJobPersistContext>> entry : JOB_PERSIST_MAP.entrySet()) {
+                entry.getValue().forEach((shardingItem, param) -> {
+                    AtomicBoolean alreadyPersisted = param.getAlreadyPersisted();
+                    if (alreadyPersisted.get()) {
+                        return;
+                    }
+                    persist(entry.getKey(), shardingItem, currentTimeMillis, param);
+                    alreadyPersisted.set(true);
+                });
+            }
+        }
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 6c56ec6179c..03404707be8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -17,13 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
-import java.sql.SQLException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
@@ -61,6 +54,15 @@ import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.lock.LockScope;
 import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Rule altered job preparer.
@@ -191,8 +193,9 @@ public final class RuleAlteredJobPreparer {
         PipelineDataSourceManager dataSourceManager = jobContext.getDataSourceManager();
         taskConfig.getDumperConfig().setPosition(getIncrementalPosition(jobContext, taskConfig, dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobContext.getSourceMetaDataLoader();
+        AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
         IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getJobConfig().getConcurrency(),
-                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine);
+                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, persistCallback);
         jobContext.getIncrementalTasks().add(incrementalTask);
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 6b0f22b27b6..620f3ec611a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -21,16 +21,10 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Rule altered job scheduler center.
@@ -42,10 +36,14 @@ public final class RuleAlteredJobSchedulerCenter {
     
     private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();
     
-    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-persist-%d"));
-    
-    static {
-        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 10, 10, TimeUnit.SECONDS);
+    /**
+     * Get job scheduler map.
+     *
+     * @param jobId job id
+     * @return job scheduler
+     */
+    public static Map<Integer, RuleAlteredJobScheduler> getJobSchedulerMap(final String jobId) {
+        return JOB_SCHEDULER_MAP.computeIfAbsent(jobId, k -> new ConcurrentHashMap<>());
     }
     
     /**
@@ -65,6 +63,7 @@ public final class RuleAlteredJobSchedulerCenter {
         RuleAlteredJobScheduler jobScheduler = new RuleAlteredJobScheduler(jobContext);
         jobScheduler.start();
         schedulerMap.put(shardingItem, jobScheduler);
+        RuleAlteredJobPersistService.addJobPersistParameter(jobId, shardingItem);
     }
     
     /**
@@ -83,6 +82,7 @@ public final class RuleAlteredJobSchedulerCenter {
             entry.getValue().stop();
         }
         JOB_SCHEDULER_MAP.remove(jobId);
+        RuleAlteredJobPersistService.removeJobPersistParameter(jobId);
     }
     
     /**
@@ -101,21 +101,4 @@ public final class RuleAlteredJobSchedulerCenter {
             entry.getValue().getJobContext().setStatus(jobStatus);
         }
     }
-    
-    private static final class PersistJobContextRunnable implements Runnable {
-        
-        @Override
-        public void run() {
-            GovernanceRepositoryAPI repositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
-            for (Entry<String, Map<Integer, RuleAlteredJobScheduler>> entry : JOB_SCHEDULER_MAP.entrySet()) {
-                try {
-                    entry.getValue().forEach((shardingItem, jobScheduler) -> repositoryAPI.persistJobProgress(jobScheduler.getJobContext()));
-                    // CHECKSTYLE:OFF
-                } catch (final Exception ex) {
-                    // CHECKSTYLE:ON
-                    log.error("persist job {} context failed.", entry.getKey(), ex);
-                }
-            }
-        }
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 81d17554660..2dc4079d45f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -46,6 +46,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJ
 import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration.InputConfiguration;
+import org.apache.shardingsphere.scaling.core.job.persist.AsyncPipelineJobPersistCallback;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -76,8 +77,9 @@ public final class InventoryTaskSplitter {
         DataSource dataSource = jobContext.getSourceDataSource();
         PipelineTableMetaDataLoader metaDataLoader = jobContext.getSourceMetaDataLoader();
         ExecuteEngine importerExecuteEngine = jobContext.getRuleAlteredContext().getImporterExecuteEngine();
+        AsyncPipelineJobPersistCallback persistCallback = new AsyncPipelineJobPersistCallback(jobContext.getJobId(), jobContext.getShardingItem());
         for (InventoryDumperConfiguration each : splitDumperConfig(jobContext, taskConfig.getDumperConfig())) {
-            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine));
+            result.add(new InventoryTask(each, taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, dataSource, metaDataLoader, importerExecuteEngine, persistCallback));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
index 430496219ca..16999111f08 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterFactory.java
@@ -22,6 +22,7 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
@@ -41,13 +42,16 @@ public final class ImporterFactory {
      * @param importerConfig importer configuration
      * @param dataSourceManager data source manager
      * @param channel channel
+     * @param pipelineJobPersistCallback job persist callback
      * @return importer
      */
     @SneakyThrows(ReflectiveOperationException.class)
-    public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    public static Importer createImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                                          final PipelineJobPersistCallback pipelineJobPersistCallback) {
         String databaseType = importerConfig.getDataSourceConfig().getDatabaseType().getType();
         ScalingEntry scalingEntry = ScalingEntryFactory.getInstance(databaseType);
-        Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class);
-        return constructor.newInstance(importerConfig, dataSourceManager, channel);
+        Constructor<? extends Importer> constructor = scalingEntry.getImporterClass().getConstructor(ImporterConfiguration.class, PipelineDataSourceManager.class, PipelineChannel.class,
+                PipelineJobPersistCallback.class);
+        return constructor.newInstance(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
similarity index 54%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
index 214ad50cccf..08b2b1336a6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/persist/AsyncPipelineJobPersistCallback.java
@@ -15,28 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.fixture;
+package org.apache.shardingsphere.scaling.core.job.persist;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPersistService;
 
-public final class FixtureImporter implements Importer {
+/**
+ * Async job process persist callback.
+ */
+@RequiredArgsConstructor
+public final class AsyncPipelineJobPersistCallback implements PipelineJobPersistCallback {
     
-    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-    }
+    private final String jobId;
+    
+    private final int shardingItem;
     
     @Override
-    public void start() {
+    public String getJobId() {
+        return jobId;
     }
     
     @Override
-    public void stop() {
+    public int getShardingItem() {
+        return shardingItem;
     }
     
     @Override
-    public void run() {
-        start();
+    public void pushPersistEvent() {
+        RuleAlteredJobPersistService.triggerPersist(jobId, shardingItem);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
index 843c6448057..002f0869450 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/importer/MySQLImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
 
@@ -27,8 +28,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
  */
 public final class MySQLImporter extends AbstractImporter {
     
-    public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(importerConfig, dataSourceManager, channel);
+    public MySQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                         final PipelineJobPersistCallback pipelineJobPersistCallback) {
+        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
index 6d5be6db7d1..c864e5c04a4 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/importer/OpenGaussImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.opengauss.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -28,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
  */
 public final class OpenGaussImporter extends AbstractImporter {
     
-    public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(importerConfig, dataSourceManager, channel);
+    public OpenGaussImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                             final PipelineJobPersistCallback pipelineJobPersistCallback) {
+        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
index 8e96369d70c..26c1bac7aa6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/importer/PostgreSQLImporter.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
@@ -28,8 +29,9 @@ import org.apache.shardingsphere.data.pipeline.core.importer.AbstractImporter;
  */
 public final class PostgreSQLImporter extends AbstractImporter {
     
-    public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-        super(importerConfig, dataSourceManager, channel);
+    public PostgreSQLImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                              final PipelineJobPersistCallback pipelineJobPersistCallback) {
+        super(importerConfig, dataSourceManager, channel, pipelineJobPersistCallback);
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 17da802eeb9..e53c0880a90 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -23,7 +23,6 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
@@ -53,11 +52,11 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -351,8 +350,9 @@ public abstract class BaseITCase {
             TimeUnit.SECONDS.timedJoin(increaseTaskThread, 60);
         }
         log.info("jobId: {}", jobId);
-        Map<String, String> actualStatusMap = new HashMap<>(2, 1);
+        Set<String> actualStatus = null;
         for (int i = 0; i < 15; i++) {
+            actualStatus = new HashSet<>();
             List<Map<String, Object>> showScalingStatusResMap = showScalingStatus(jobId);
             log.info("show scaling status result: {}", showScalingStatusResMap);
             boolean finished = true;
@@ -361,8 +361,7 @@ public abstract class BaseITCase {
                 assertThat(status, not(JobStatus.PREPARING_FAILURE.name()));
                 assertThat(status, not(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
                 assertThat(status, not(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
-                String datasourceName = entry.get("data_source").toString();
-                actualStatusMap.put(datasourceName, status);
+                actualStatus.add(status);
                 if (!Objects.equals(status, JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
                     finished = false;
                     break;
@@ -374,7 +373,7 @@ public abstract class BaseITCase {
             assertBeforeApplyScalingMetadataCorrectly();
             ThreadUtil.sleep(4, TimeUnit.SECONDS);
         }
-        assertThat(actualStatusMap.values().stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet()), is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
+        assertThat(actualStatus, is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
     }
     
     protected List<Map<String, Object>> showScalingStatus(final String jobId) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index aee49a75fdc..12cfdc7c982 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -159,7 +160,7 @@ public final class GovernanceRepositoryAPIImplTest {
         PipelineDataSourceWrapper dataSource = mock(PipelineDataSourceWrapper.class);
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration taskConfig) {
@@ -167,6 +168,6 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index 214ad50cccf..4932eb50cea 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -19,12 +19,14 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 
 public final class FixtureImporter implements Importer {
     
-    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
+    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel,
+                           final PipelineJobPersistCallback pipelineJobPersistCallback) {
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
similarity index 59%
copy from shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
copy to shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
index 214ad50cccf..00e1a3f30d8 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobPersistCallback.java
@@ -17,26 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.PipelineJobPersistCallback;
 
-public final class FixtureImporter implements Importer {
-    
-    public FixtureImporter(final ImporterConfiguration importerConfig, final PipelineDataSourceManager dataSourceManager, final PipelineChannel channel) {
-    }
+public final class FixturePipelineJobPersistCallback implements PipelineJobPersistCallback {
     
     @Override
-    public void start() {
+    public String getJobId() {
+        return null;
     }
     
     @Override
-    public void stop() {
+    public int getShardingItem() {
+        return 0;
     }
     
     @Override
-    public void run() {
-        start();
+    public void pushPersistEvent() {
     }
 }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
index 6b1b19fabea..14406182f8b 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/AbstractImporterTest.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -81,7 +82,7 @@ public final class AbstractImporterTest {
     
     @Before
     public void setUp() throws SQLException {
-        jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel) {
+        jdbcImporter = new AbstractImporter(mockImporterConfiguration(), dataSourceManager, channel, new FixturePipelineJobPersistCallback()) {
             
             @Override
             protected String getSchemaName(final String logicTableName) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index b9fe4b5aade..af11ca297ec 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -54,7 +55,7 @@ public final class IncrementalTaskTest {
         PipelineTableMetaDataLoader metaDataLoader = new PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine());
+                new PipelineDataSourceManager(), metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback());
     }
     
     @Test
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 45583ff664b..dc14ac4d84c 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobPersistCallback;
 import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -72,7 +73,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
-                        DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
+                        DATA_SOURCE_MANAGER, dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
             inventoryTask.start();
         }
     }
@@ -87,7 +88,7 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
-                        new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine())) {
+                        new PipelineDataSourceManager(), dataSource, metaDataLoader, PipelineContextUtil.getExecuteEngine(), new FixturePipelineJobPersistCallback())) {
             inventoryTask.start();
             assertThat(inventoryTask.getProgress().getPosition(), instanceOf(IntegerPrimaryKeyPosition.class));
         }