You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2020/12/29 13:59:32 UTC

[shardingsphere] branch master updated: Optimize job progress (#8807)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bb4bbda  Optimize job progress (#8807)
bb4bbda is described below

commit bb4bbdae05bd0fd2a65ebfd5492ccb46c0389c3d
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Dec 29 21:59:07 2020 +0800

    Optimize job progress (#8807)
    
    * Optimize job progress
    
    * Optimize job progress
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../user-manual/shardingsphere-scaling/usage.cn.md      | 10 +++-------
 .../user-manual/shardingsphere-scaling/usage.en.md      | 10 +++-------
 .../shardingsphere/scaling/web/HttpServerHandler.java   |  3 ++-
 .../shardingsphere/scaling/core/job/JobProgress.java    | 11 ++++++-----
 .../job/task/incremental/IncrementalTaskProgress.java   |  9 ++++++---
 .../InventoryTaskGroupProgress.java}                    | 16 +++++++---------
 .../scaling/core/schedule/ScalingTaskScheduler.java     | 13 ++++++-------
 .../core/service/impl/DistributedScalingJobService.java | 17 +++++++++--------
 .../core/service/impl/StandaloneScalingJobService.java  | 12 ++++++++++--
 .../scaling/core/utils/ScalingTaskUtil.java             |  7 ++-----
 .../service/impl/DistributedScalingJobServiceTest.java  | 14 +++++---------
 11 files changed, 59 insertions(+), 63 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 211f1aa..1baf0b2 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -260,7 +260,7 @@ curl -X GET \
 
 #### 停止迁移任务
 
-接口描述:POST /scaling/job/stop
+接口描述:GET /scaling/job/stop
 
 请求体:
 
@@ -270,12 +270,8 @@ curl -X GET \
 
 示例:
 ```
-curl -X POST \
-  http://localhost:8888/scaling/job/stop \
-  -H 'content-type: application/json' \
-  -d '{
-   "jobId":1
-}'
+curl -X GET \
+  http://localhost:8888/scaling/job/stop/1
 ```
 返回信息:
 ```
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index eb1ebe5..447a6c3 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -259,7 +259,7 @@ Response:
 
 #### Stop scaling job
 
-Interface description:POST /scaling/job/stop
+Interface description:GET /scaling/job/stop
 
 Body:
 
@@ -269,12 +269,8 @@ Body:
 
 Example:
 ```
-curl -X POST \
-  http://localhost:8888/scaling/job/stop \
-  -H 'content-type: application/json' \
-  -d '{
-   "jobId":1
-}'
+curl -X GET \
+  http://localhost:8888/scaling/job/stop/1
 ```
 Response:
 ```
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 5e6ba31..1b4a26d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.web;
 import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.LongSerializationPolicy;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -51,7 +52,7 @@ import java.util.Optional;
 @Slf4j
 public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
     
-    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+    private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
     
     private static final ScalingJobService SCALING_JOB_SERVICE = ScalingJobServiceFactory.getInstance();
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
index f4ce959..f651c16 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
@@ -17,12 +17,13 @@
 
 package org.apache.shardingsphere.scaling.core.job;
 
-import com.google.common.collect.Maps;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
 
-import java.util.Collection;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Job progress.
@@ -35,7 +36,7 @@ public final class JobProgress {
     
     private final String status;
     
-    private final Map<String, Collection<TaskProgress>> inventoryTaskProgress = Maps.newHashMap();
+    private final List<InventoryTaskGroupProgress> inventoryTaskProgress = new LinkedList<>();
     
-    private final Map<String, Collection<TaskProgress>> incrementalTaskProgress = Maps.newHashMap();
+    private final List<IncrementalTaskProgress> incrementalTaskProgress = new LinkedList<>();
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
index ce33850..6425d54 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
@@ -17,21 +17,24 @@
 
 package org.apache.shardingsphere.scaling.core.job.task.incremental;
 
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.TaskProgress;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
 
 /**
  * Incremental task progress.
  */
 @Getter
 @RequiredArgsConstructor
+@AllArgsConstructor
 public final class IncrementalTaskProgress implements TaskProgress {
     
     private final String id;
     
+    private String shardingItem;
+    
     private final long delayMillisecond;
     
     private final Position<?> position;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
similarity index 73%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
index ce33850..ef2de2b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.task.incremental;
-
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+package org.apache.shardingsphere.scaling.core.job.task.inventory;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 
 /**
- * Incremental task progress.
+ * Inventory task group progress.
  */
 @Getter
 @RequiredArgsConstructor
-public final class IncrementalTaskProgress implements TaskProgress {
+public final class InventoryTaskGroupProgress implements TaskProgress {
     
-    private final String id;
+    private final String shardingItem;
     
-    private final long delayMillisecond;
+    private final int total;
     
-    private final Position<?> position;
+    private final int finished;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index c71ff86..5932d96 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -22,8 +22,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.util.Collection;
@@ -135,10 +136,8 @@ public final class ScalingTaskScheduler implements Runnable {
      *
      * @return all inventory data task progress
      */
-    public Collection<TaskProgress> getInventoryTaskProgress() {
-        return scalingJob.getInventoryTasks().stream()
-                .map(ScalingTask::getProgress)
-                .collect(Collectors.toList());
+    public Collection<InventoryTaskProgress> getInventoryTaskProgress() {
+        return scalingJob.getInventoryTasks().stream().map(each -> (InventoryTaskProgress) each.getProgress()).collect(Collectors.toList());
     }
     
     /**
@@ -146,7 +145,7 @@ public final class ScalingTaskScheduler implements Runnable {
      *
      * @return all incremental data task progress
      */
-    public Collection<TaskProgress> getIncrementalTaskProgress() {
-        return scalingJob.getIncrementalTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
+    public Collection<IncrementalTaskProgress> getIncrementalTaskProgress() {
+        return scalingJob.getIncrementalTasks().stream().map(each -> (IncrementalTaskProgress) each.getProgress()).collect(Collectors.toList());
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 07d07d9..bac4f70 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.TaskProgress;
 import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
 import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
 import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
@@ -98,25 +99,25 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
         JobProgress result = new JobProgress(jobId, running ? "RUNNING" : "STOPPED");
         List<String> shardingItems = REGISTRY_REPOSITORY.getChildrenKeys(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION));
         for (String each : shardingItems) {
-            result.getInventoryTaskProgress().put(each, getInventoryTaskProgress(jobId, each));
-            result.getIncrementalTaskProgress().put(each, getIncrementalTaskProgress(jobId, each));
+            result.getInventoryTaskProgress().add(getInventoryTaskProgress(jobId, each));
+            result.getIncrementalTaskProgress().addAll(getIncrementalTaskProgress(jobId, each));
         }
         return result;
     }
     
-    private List<TaskProgress> getInventoryTaskProgress(final long jobId, final String shardingItem) {
+    private InventoryTaskGroupProgress getInventoryTaskProgress(final long jobId, final String shardingItem) {
         InventoryPositionGroup inventoryPositionGroup = InventoryPositionGroup.fromJson(
                 REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION, shardingItem, ScalingConstant.INVENTORY)));
-        List<TaskProgress> result = inventoryPositionGroup.getUnfinished().keySet().stream().map(each -> new InventoryTaskProgress(each, false)).collect(Collectors.toList());
-        result.addAll(inventoryPositionGroup.getFinished().stream().map(each -> new InventoryTaskProgress(each, true)).collect(Collectors.toList()));
-        return result;
+        List<TaskProgress> unfinished = inventoryPositionGroup.getUnfinished().keySet().stream().map(each -> new InventoryTaskProgress(each, false)).collect(Collectors.toList());
+        List<TaskProgress> finished = inventoryPositionGroup.getFinished().stream().map(each -> new InventoryTaskProgress(each, true)).collect(Collectors.toList());
+        return new InventoryTaskGroupProgress(shardingItem, unfinished.size() + finished.size(), finished.size());
     }
     
-    private List<TaskProgress> getIncrementalTaskProgress(final long jobId, final String shardingItem) {
+    private List<IncrementalTaskProgress> getIncrementalTaskProgress(final long jobId, final String shardingItem) {
         String position = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION, shardingItem, ScalingConstant.INCREMENTAL));
         JsonObject jsonObject = GSON.fromJson(position, JsonObject.class);
         return jsonObject.entrySet().stream()
-                .map(entry -> new IncrementalTaskProgress(entry.getKey(), entry.getValue().getAsJsonObject().get(ScalingConstant.DELAY).getAsLong(), null))
+                .map(entry -> new IncrementalTaskProgress(entry.getKey(), shardingItem, entry.getValue().getAsJsonObject().get(ScalingConstant.DELAY).getAsLong(), null))
                 .collect(Collectors.toList());
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
index 9363a4a..dff7a80 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
@@ -22,11 +22,14 @@ import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundExcept
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
 import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
 import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -81,12 +84,17 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
     public JobProgress getProgress(final long jobId) {
         JobProgress result = new JobProgress(jobId, getJob(jobId).getStatus());
         if (scalingTaskSchedulerMap.containsKey(jobId)) {
-            result.getInventoryTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress());
-            result.getIncrementalTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
+            result.getInventoryTaskProgress().add(getInventoryTaskProgress(jobId));
+            result.getIncrementalTaskProgress().addAll(scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
         }
         return result;
     }
     
+    private InventoryTaskGroupProgress getInventoryTaskProgress(final long jobId) {
+        Collection<InventoryTaskProgress> inventoryTaskProgress = scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress();
+        return new InventoryTaskGroupProgress("", inventoryTaskProgress.size(), (int) inventoryTaskProgress.stream().filter(InventoryTaskProgress::isFinished).count());
+    }
+    
     @Override
     public void remove(final long jobId) {
         stop(jobId);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index c62f2de..778fd13 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -22,9 +22,7 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
 import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
 import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 
 import java.util.Collection;
 
@@ -51,9 +49,8 @@ public final class ScalingTaskUtil {
      * @return almost finished or not
      */
     public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
-        return jobProgress.getInventoryTaskProgress().values().stream().flatMap(Collection::stream).allMatch(each -> ((InventoryTaskProgress) each).isFinished())
-                && jobProgress.getIncrementalTaskProgress().values().stream().flatMap(Collection::stream)
-                .allMatch(each -> ((IncrementalTaskProgress) each).getDelayMillisecond() < jobConfig.getAllowDelay());
+        return jobProgress.getInventoryTaskProgress().stream().allMatch(each -> each.getTotal() == each.getFinished())
+                && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getAllowDelay());
     }
     
     /**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index c886d3b..1fa422d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -29,8 +29,6 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
 import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
 import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -120,13 +118,11 @@ public final class DistributedScalingJobServiceTest {
         registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/position/1/incremental"),
                 "{'ds2':{'filename':binlog1,'position':4,'delay':2},'ds4':{'filename':binlog2,'position':4,'delay':4}}");
         JobProgress actual = scalingJobService.getProgress(1);
-        assertThat(actual.getInventoryTaskProgress().get("0").stream()
-                .map(each -> (InventoryTaskProgress) each)
-                .filter(InventoryTaskProgress::isFinished).count(), is(2L));
-        assertTrue(actual.getIncrementalTaskProgress().get("1").stream()
-                .map(each -> (IncrementalTaskProgress) each)
-                .filter(each -> "ds2".equals(each.getId()))
-                .allMatch(each -> 2 == each.getDelayMillisecond()));
+        assertThat(actual.getInventoryTaskProgress().size(), is(2));
+        assertThat(actual.getIncrementalTaskProgress().size(), is(4));
+        assertThat(actual.getInventoryTaskProgress().get(0).getTotal(), is(5));
+        assertThat(actual.getInventoryTaskProgress().get(0).getFinished(), is(2));
+        assertThat(actual.getIncrementalTaskProgress().get(0).getDelayMillisecond(), is(1L));
     }
     
     @Test