You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/16 12:18:17 UTC

[shardingsphere] branch master updated: Show migration status add processed_records_count column (#21019)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 902cff301f0 Show migration status add processed_records_count column (#21019)
902cff301f0 is described below

commit 902cff301f0a6ea122a33dae0616c906e27f7193
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Sep 16 20:18:00 2022 +0800

    Show migration status add processed_records_count column (#21019)
    
    * Delete listener
    
    * Show migration status add processed records column
    
    * Improve name
    
    * Fix ci
---
 .../ShowMigrationJobStatusQueryResultSet.java      |  4 ++-
 .../InventoryIncrementalJobItemProgress.java       |  2 ++
 .../listener/PipelineJobProgressListener.java      |  4 ++-
 ...va => PipelineJobProgressUpdatedParameter.java} | 16 +++++----
 .../impl/InventoryIncrementalJobItemAPIImpl.java   |  1 +
 .../InventoryIncrementalJobItemContext.java        | 10 +++++-
 .../pipeline/core/importer/DefaultImporter.java    | 18 ++++++----
 .../DefaultPipelineJobProgressListener.java        | 38 ----------------------
 .../YamlInventoryIncrementalJobItemProgress.java   |  2 ++
 ...InventoryIncrementalJobItemProgressSwapper.java |  2 ++
 .../core/prepare/InventoryTaskSplitter.java        |  4 +--
 .../migration/MigrationJobItemContext.java         | 20 ++++++++++++
 .../scenario/migration/MigrationJobPreparer.java   |  4 +--
 .../data/pipeline/cases/base/BaseITCase.java       |  6 ++--
 .../migration/general/MySQLMigrationGeneralIT.java |  7 +++-
 .../FixturePipelineJobProgressListener.java        |  3 +-
 16 files changed, 78 insertions(+), 63 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 4256fc45793..78eb9d3f80d 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -51,6 +51,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                         result.add(entry.getValue().getDataSourceName());
                         result.add(entry.getValue().getStatus());
                         result.add(entry.getValue().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
+                        result.add(entry.getValue().getProcessedRecordsCount());
                         result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
                         long latestActiveTimeMillis = entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
                         result.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
@@ -60,6 +61,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                         result.add("");
                         result.add("");
                         result.add("");
+                        result.add("");
                     }
                     return result;
                 }).collect(Collectors.toList()).iterator();
@@ -67,7 +69,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_seconds");
+        return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds");
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index 4f22da7dfaf..7d08abc8788 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -39,4 +39,6 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
     private JobItemInventoryTasksProgress inventory;
     
     private JobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordsCount;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
index 4ca18491ef8..b8fadbde343 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
@@ -24,6 +24,8 @@ public interface PipelineJobProgressListener {
     
     /**
      * Emit on progress updated.
+     *
+     * @param parameter process update parameter
      */
-    void onProgressUpdated();
+    void onProgressUpdated(PipelineJobProgressUpdatedParameter parameter);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
similarity index 75%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
index 4ca18491ef8..1a21a14bb7e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressListener.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/listener/PipelineJobProgressUpdatedParameter.java
@@ -17,13 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job.progress.listener;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 /**
- * Pipeline job progress listener.
+ * Pipeline job process update parameter.
  */
-public interface PipelineJobProgressListener {
+@RequiredArgsConstructor
+@Getter
+public final class PipelineJobProgressUpdatedParameter {
+    
+    private final int insertedRecordsCount;
     
-    /**
-     * Emit on progress updated.
-     */
-    void onProgressUpdated();
+    private final int deletedRecordsCount;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index ec1302c76df..766fdc24245 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -55,6 +55,7 @@ public final class InventoryIncrementalJobItemAPIImpl implements PipelineJobItem
         jobItemProgress.setDataSourceName(jobItemContext.getDataSourceName());
         jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
         jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+        jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
         String value = YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
         PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), value);
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 36a153ff08d..7d6a930d646 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.context;
 
 import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
@@ -26,7 +27,7 @@ import java.util.Collection;
 /**
  * Inventory incremental job item context.
  */
-public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext {
+public interface InventoryIncrementalJobItemContext extends PipelineJobItemContext, PipelineJobProgressListener {
     
     @Override
     InventoryIncrementalProcessContext getJobProcessContext();
@@ -44,4 +45,11 @@ public interface InventoryIncrementalJobItemContext extends PipelineJobItemConte
      * @return incremental tasks
      */
     Collection<IncrementalTask> getIncrementalTasks();
+    
+    /**
+     * Get processed record count.
+     *
+     * @return processed record count.
+     */
+    long getProcessedRecordsCount();
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 558f73aabd8..d5b34f86e22 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
@@ -97,9 +98,9 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
             if (null != records && !records.isEmpty()) {
                 round++;
                 rowCount += records.size();
-                flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
+                PipelineJobProgressUpdatedParameter updatedParameter = flush(dataSourceManager.getDataSource(importerConfig.getDataSourceConfig()), records);
                 channel.ack(records);
-                jobProgressListener.onProgressUpdated();
+                jobProgressListener.onProgressUpdated(updatedParameter);
                 if (0 == round % 50) {
                     log.info("importer write, round={}, rowCount={}", round, rowCount);
                 }
@@ -113,13 +114,18 @@ public final class DefaultImporter extends AbstractLifecycleExecutor implements
         log.info("importer write done, rowCount={}, finishedByBreak={}", rowCount, finishedByBreak);
     }
     
-    private void flush(final DataSource dataSource, final List<Record> buffer) {
-        List<GroupedDataRecord> groupedDataRecords = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
-        groupedDataRecords.forEach(each -> {
+    private PipelineJobProgressUpdatedParameter flush(final DataSource dataSource, final List<Record> buffer) {
+        List<GroupedDataRecord> result = MERGER.group(buffer.stream().filter(each -> each instanceof DataRecord).map(each -> (DataRecord) each).collect(Collectors.toList()));
+        int insertRecordNumber = 0;
+        int deleteRecordNumber = 0;
+        for (GroupedDataRecord each : result) {
+            deleteRecordNumber += null != each.getDeleteDataRecords() ? each.getDeleteDataRecords().size() : 0;
             flushInternal(dataSource, each.getDeleteDataRecords());
+            insertRecordNumber += null != each.getInsertDataRecords() ? each.getInsertDataRecords().size() : 0;
             flushInternal(dataSource, each.getInsertDataRecords());
             flushInternal(dataSource, each.getUpdateDataRecords());
-        });
+        }
+        return new PipelineJobProgressUpdatedParameter(insertRecordNumber, deleteRecordNumber);
     }
     
     private void flushInternal(final DataSource dataSource, final List<DataRecord> buffer) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
deleted file mode 100644
index 97b55c4e48a..00000000000
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/listener/DefaultPipelineJobProgressListener.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.progress.listener;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-
-/**
- * Default pipeline job progress listener implementation.
- */
-@RequiredArgsConstructor
-public final class DefaultPipelineJobProgressListener implements PipelineJobProgressListener {
-    
-    private final String jobId;
-    
-    private final int shardingItem;
-    
-    @Override
-    public void onProgressUpdated() {
-        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
-    }
-}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index de052fe82ef..956a90506a6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -37,4 +37,6 @@ public final class YamlInventoryIncrementalJobItemProgress implements YamlConfig
     private YamlJobItemInventoryTasksProgress inventory;
     
     private YamlJobItemIncrementalTasksProgress incremental;
+    
+    private long processedRecordsCount;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 6624e8bd750..a7e89170ef3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -38,6 +38,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setDataSourceName(progress.getDataSourceName());
         result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToYaml(progress.getInventory()));
         result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToYaml(progress.getIncremental()));
+        result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
         return result;
     }
     
@@ -49,6 +50,7 @@ public final class YamlInventoryIncrementalJobItemProgressSwapper implements Yam
         result.setDataSourceName(yamlProgress.getDataSourceName());
         result.setInventory(INVENTORY_PROGRESS_SWAPPER.swapToObject(yamlProgress.getInventory()));
         result.setIncremental(INCREMENTAL_PROGRESS_SWAPPER.swapToObject(yamlProgress.getSourceDatabaseType(), yamlProgress.getIncremental()));
+        result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
         return result;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index e116ec9e290..23ce1729373 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -37,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -84,10 +83,9 @@ public final class InventoryTaskSplitter {
     public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemContext jobItemContext) {
         List<InventoryTask> result = new LinkedList<>();
         PipelineChannelCreator pipelineChannelCreator = jobItemContext.getJobProcessContext().getPipelineChannelCreator();
-        DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         for (InventoryDumperConfiguration each : splitDumperConfig(jobItemContext, dumperConfig)) {
             result.add(new InventoryTask(each, importerConfig, pipelineChannelCreator, dataSourceManager, sourceDataSource, metaDataLoader, importerExecuteEngine,
-                    jobProgressListener));
+                    jobItemContext));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index ca19765cf46..80240a5bd97 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -28,14 +28,17 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Migration job item context.
@@ -63,6 +66,8 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     
     private final Collection<IncrementalTask> incrementalTasks = new LinkedList<>();
     
+    private final AtomicLong processedRecordsCount = new AtomicLong(0);
+    
     private final MigrationJobConfiguration jobConfig;
     
     private final MigrationProcessContext jobProcessContext;
@@ -92,6 +97,9 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
         this.shardingItem = shardingItem;
         this.dataSourceName = taskConfig.getDataSourceName();
         this.initProgress = initProgress;
+        if (null != initProgress) {
+            processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+        }
         this.jobProcessContext = jobProcessContext;
         this.taskConfig = taskConfig;
         this.dataSourceManager = dataSourceManager;
@@ -125,4 +133,16 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte
     public boolean isSourceTargetDatabaseTheSame() {
         return jobConfig.getSourceDatabaseType().equalsIgnoreCase(jobConfig.getTargetDatabaseType());
     }
+    
+    @Override
+    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
+        int needAddNumber = parameter.getInsertedRecordsCount() - parameter.getDeletedRecordsCount();
+        processedRecordsCount.addAndGet(needAddNumber);
+        PipelineJobProgressPersistService.notifyPersist(jobId, shardingItem);
+    }
+    
+    @Override
+    public long getProcessedRecordsCount() {
+        return processedRecordsCount.get();
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index daa6ace0229..48071fee418 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
 import org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
@@ -163,9 +162,8 @@ public final class MigrationJobPreparer {
         JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental();
         taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperConfig(), dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = jobItemContext.getSourceMetaDataLoader();
-        DefaultPipelineJobProgressListener jobProgressListener = new DefaultPipelineJobProgressListener(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         IncrementalTask incrementalTask = new IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
-                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobProgressListener);
+                taskConfig.getDumperConfig(), taskConfig.getImporterConfig(), pipelineChannelCreator, dataSourceManager, sourceMetaDataLoader, incrementalDumperExecuteEngine, jobItemContext);
         jobItemContext.getIncrementalTasks().add(incrementalTask);
     }
     
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 75cb988b2e7..2105107d595 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -51,6 +51,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -264,7 +265,7 @@ public abstract class BaseITCase {
         getIncreaseTaskThread().start();
     }
     
-    protected void waitJobFinished(final String distSQL) throws InterruptedException {
+    protected List<Map<String, Object>> waitJobFinished(final String distSQL) throws InterruptedException {
         if (null != getIncreaseTaskThread()) {
             TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
         }
@@ -276,12 +277,13 @@ public abstract class BaseITCase {
             assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
                     JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
             if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
-                break;
+                return listJobStatus;
             } else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
                 log.warn("one of the shardingItem was not started correctly");
             }
             ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
+        return Collections.emptyList();
     }
     
     protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) throws SQLException {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 2a9610924d8..5f6c60c08f5 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -38,9 +38,11 @@ import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 /**
  * General migration test case, includes multiple cases.
@@ -105,7 +107,10 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
     }
     
     private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
-        waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        List<Map<String, Object>> jobStatus = waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        for (Map<String, Object> each : jobStatus) {
+            assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
+        }
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
index 8a2a7a11702..e70bcbf50d6 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineJobProgressListener.java
@@ -17,11 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
+import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 
 public final class FixturePipelineJobProgressListener implements PipelineJobProgressListener {
     
     @Override
-    public void onProgressUpdated() {
+    public void onProgressUpdated(final PipelineJobProgressUpdatedParameter parameter) {
     }
 }