You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2020/12/29 13:59:32 UTC
[shardingsphere] branch master updated: Optimize job progress
(#8807)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bb4bbda Optimize job progress (#8807)
bb4bbda is described below
commit bb4bbdae05bd0fd2a65ebfd5492ccb46c0389c3d
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Tue Dec 29 21:59:07 2020 +0800
Optimize job progress (#8807)
* Optimize job progress
* Optimize job progress
Co-authored-by: qiulu3 <Lucas209910>
---
.../user-manual/shardingsphere-scaling/usage.cn.md | 10 +++-------
.../user-manual/shardingsphere-scaling/usage.en.md | 10 +++-------
.../shardingsphere/scaling/web/HttpServerHandler.java | 3 ++-
.../shardingsphere/scaling/core/job/JobProgress.java | 11 ++++++-----
.../job/task/incremental/IncrementalTaskProgress.java | 9 ++++++---
.../InventoryTaskGroupProgress.java} | 16 +++++++---------
.../scaling/core/schedule/ScalingTaskScheduler.java | 13 ++++++-------
.../core/service/impl/DistributedScalingJobService.java | 17 +++++++++--------
.../core/service/impl/StandaloneScalingJobService.java | 12 ++++++++++--
.../scaling/core/utils/ScalingTaskUtil.java | 7 ++-----
.../service/impl/DistributedScalingJobServiceTest.java | 14 +++++---------
11 files changed, 59 insertions(+), 63 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 211f1aa..1baf0b2 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -260,7 +260,7 @@ curl -X GET \
#### 停止迁移任务
-接口描述:POST /scaling/job/stop
+接口描述:GET /scaling/job/stop
请求体:
@@ -270,12 +270,8 @@ curl -X GET \
示例:
```
-curl -X POST \
- http://localhost:8888/scaling/job/stop \
- -H 'content-type: application/json' \
- -d '{
- "jobId":1
-}'
+curl -X GET \
+ http://localhost:8888/scaling/job/stop/1
```
返回信息:
```
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index eb1ebe5..447a6c3 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -259,7 +259,7 @@ Response:
#### Stop scaling job
-Interface description:POST /scaling/job/stop
+Interface description:GET /scaling/job/stop
Body:
@@ -269,12 +269,8 @@ Body:
Example:
```
-curl -X POST \
- http://localhost:8888/scaling/job/stop \
- -H 'content-type: application/json' \
- -d '{
- "jobId":1
-}'
+curl -X GET \
+ http://localhost:8888/scaling/job/stop/1
```
Response:
```
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
index 5e6ba31..1b4a26d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/web/HttpServerHandler.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.web;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.LongSerializationPolicy;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -51,7 +52,7 @@ import java.util.Optional;
@Slf4j
public final class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
- private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
+ private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().setLongSerializationPolicy(LongSerializationPolicy.STRING).create();
private static final ScalingJobService SCALING_JOB_SERVICE = ScalingJobServiceFactory.getInstance();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
index f4ce959..f651c16 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/JobProgress.java
@@ -17,12 +17,13 @@
package org.apache.shardingsphere.scaling.core.job;
-import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
-import java.util.Collection;
-import java.util.Map;
+import java.util.LinkedList;
+import java.util.List;
/**
* Job progress.
@@ -35,7 +36,7 @@ public final class JobProgress {
private final String status;
- private final Map<String, Collection<TaskProgress>> inventoryTaskProgress = Maps.newHashMap();
+ private final List<InventoryTaskGroupProgress> inventoryTaskProgress = new LinkedList<>();
- private final Map<String, Collection<TaskProgress>> incrementalTaskProgress = Maps.newHashMap();
+ private final List<IncrementalTaskProgress> incrementalTaskProgress = new LinkedList<>();
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
index ce33850..6425d54 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
@@ -17,21 +17,24 @@
package org.apache.shardingsphere.scaling.core.job.task.incremental;
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
-
+import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.TaskProgress;
+import org.apache.shardingsphere.scaling.core.job.position.Position;
/**
* Incremental task progress.
*/
@Getter
@RequiredArgsConstructor
+@AllArgsConstructor
public final class IncrementalTaskProgress implements TaskProgress {
private final String id;
+ private String shardingItem;
+
private final long delayMillisecond;
private final Position<?> position;
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
similarity index 73%
copy from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
copy to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
index ce33850..ef2de2b 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/incremental/IncrementalTaskProgress.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryTaskGroupProgress.java
@@ -15,24 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.job.task.incremental;
-
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
-import org.apache.shardingsphere.scaling.core.job.position.Position;
+package org.apache.shardingsphere.scaling.core.job.task.inventory;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.scaling.core.job.TaskProgress;
/**
- * Incremental task progress.
+ * Inventory task group progress.
*/
@Getter
@RequiredArgsConstructor
-public final class IncrementalTaskProgress implements TaskProgress {
+public final class InventoryTaskGroupProgress implements TaskProgress {
- private final String id;
+ private final String shardingItem;
- private final long delayMillisecond;
+ private final int total;
- private final Position<?> position;
+ private final int finished;
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index c71ff86..5932d96 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -22,8 +22,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import org.apache.shardingsphere.scaling.core.execute.engine.ExecuteCallback;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.TaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
+import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
import java.util.Collection;
@@ -135,10 +136,8 @@ public final class ScalingTaskScheduler implements Runnable {
*
* @return all inventory data task progress
*/
- public Collection<TaskProgress> getInventoryTaskProgress() {
- return scalingJob.getInventoryTasks().stream()
- .map(ScalingTask::getProgress)
- .collect(Collectors.toList());
+ public Collection<InventoryTaskProgress> getInventoryTaskProgress() {
+ return scalingJob.getInventoryTasks().stream().map(each -> (InventoryTaskProgress) each.getProgress()).collect(Collectors.toList());
}
/**
@@ -146,7 +145,7 @@ public final class ScalingTaskScheduler implements Runnable {
*
* @return all incremental data task progress
*/
- public Collection<TaskProgress> getIncrementalTaskProgress() {
- return scalingJob.getIncrementalTasks().stream().map(ScalingTask::getProgress).collect(Collectors.toList());
+ public Collection<IncrementalTaskProgress> getIncrementalTaskProgress() {
+ return scalingJob.getIncrementalTasks().stream().map(each -> (IncrementalTaskProgress) each.getProgress()).collect(Collectors.toList());
}
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
index 07d07d9..bac4f70 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobService.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.TaskProgress;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPositionGroup;
import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
@@ -98,25 +99,25 @@ public final class DistributedScalingJobService extends AbstractScalingJobServic
JobProgress result = new JobProgress(jobId, running ? "RUNNING" : "STOPPED");
List<String> shardingItems = REGISTRY_REPOSITORY.getChildrenKeys(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION));
for (String each : shardingItems) {
- result.getInventoryTaskProgress().put(each, getInventoryTaskProgress(jobId, each));
- result.getIncrementalTaskProgress().put(each, getIncrementalTaskProgress(jobId, each));
+ result.getInventoryTaskProgress().add(getInventoryTaskProgress(jobId, each));
+ result.getIncrementalTaskProgress().addAll(getIncrementalTaskProgress(jobId, each));
}
return result;
}
- private List<TaskProgress> getInventoryTaskProgress(final long jobId, final String shardingItem) {
+ private InventoryTaskGroupProgress getInventoryTaskProgress(final long jobId, final String shardingItem) {
InventoryPositionGroup inventoryPositionGroup = InventoryPositionGroup.fromJson(
REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION, shardingItem, ScalingConstant.INVENTORY)));
- List<TaskProgress> result = inventoryPositionGroup.getUnfinished().keySet().stream().map(each -> new InventoryTaskProgress(each, false)).collect(Collectors.toList());
- result.addAll(inventoryPositionGroup.getFinished().stream().map(each -> new InventoryTaskProgress(each, true)).collect(Collectors.toList()));
- return result;
+ List<TaskProgress> unfinished = inventoryPositionGroup.getUnfinished().keySet().stream().map(each -> new InventoryTaskProgress(each, false)).collect(Collectors.toList());
+ List<TaskProgress> finished = inventoryPositionGroup.getFinished().stream().map(each -> new InventoryTaskProgress(each, true)).collect(Collectors.toList());
+ return new InventoryTaskGroupProgress(shardingItem, unfinished.size() + finished.size(), finished.size());
}
- private List<TaskProgress> getIncrementalTaskProgress(final long jobId, final String shardingItem) {
+ private List<IncrementalTaskProgress> getIncrementalTaskProgress(final long jobId, final String shardingItem) {
String position = REGISTRY_REPOSITORY.get(ScalingTaskUtil.getScalingListenerPath(jobId, ScalingConstant.POSITION, shardingItem, ScalingConstant.INCREMENTAL));
JsonObject jsonObject = GSON.fromJson(position, JsonObject.class);
return jsonObject.entrySet().stream()
- .map(entry -> new IncrementalTaskProgress(entry.getKey(), entry.getValue().getAsJsonObject().get(ScalingConstant.DELAY).getAsLong(), null))
+ .map(entry -> new IncrementalTaskProgress(entry.getKey(), shardingItem, entry.getValue().getAsJsonObject().get(ScalingConstant.DELAY).getAsLong(), null))
.collect(Collectors.toList());
}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
index 9363a4a..dff7a80 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/impl/StandaloneScalingJobService.java
@@ -22,11 +22,14 @@ import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundExcept
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
import org.apache.shardingsphere.scaling.core.job.preparer.ScalingJobPreparer;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskGroupProgress;
+import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import org.apache.shardingsphere.scaling.core.schedule.JobStatus;
import org.apache.shardingsphere.scaling.core.schedule.ScalingTaskScheduler;
import org.apache.shardingsphere.scaling.core.service.AbstractScalingJobService;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -81,12 +84,17 @@ public final class StandaloneScalingJobService extends AbstractScalingJobService
public JobProgress getProgress(final long jobId) {
JobProgress result = new JobProgress(jobId, getJob(jobId).getStatus());
if (scalingTaskSchedulerMap.containsKey(jobId)) {
- result.getInventoryTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress());
- result.getIncrementalTaskProgress().put("0", scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
+ result.getInventoryTaskProgress().add(getInventoryTaskProgress(jobId));
+ result.getIncrementalTaskProgress().addAll(scalingTaskSchedulerMap.get(jobId).getIncrementalTaskProgress());
}
return result;
}
+ private InventoryTaskGroupProgress getInventoryTaskProgress(final long jobId) {
+ Collection<InventoryTaskProgress> inventoryTaskProgress = scalingTaskSchedulerMap.get(jobId).getInventoryTaskProgress();
+ return new InventoryTaskGroupProgress("", inventoryTaskProgress.size(), (int) inventoryTaskProgress.stream().filter(InventoryTaskProgress::isFinished).count());
+ }
+
@Override
public void remove(final long jobId) {
stop(jobId);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index c62f2de..778fd13 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -22,9 +22,7 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.position.FinishedPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTask;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import java.util.Collection;
@@ -51,9 +49,8 @@ public final class ScalingTaskUtil {
* @return almost finished or not
*/
public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
- return jobProgress.getInventoryTaskProgress().values().stream().flatMap(Collection::stream).allMatch(each -> ((InventoryTaskProgress) each).isFinished())
- && jobProgress.getIncrementalTaskProgress().values().stream().flatMap(Collection::stream)
- .allMatch(each -> ((IncrementalTaskProgress) each).getDelayMillisecond() < jobConfig.getAllowDelay());
+ return jobProgress.getInventoryTaskProgress().stream().allMatch(each -> each.getTotal() == each.getFinished())
+ && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getAllowDelay());
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index c886d3b..1fa422d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -29,8 +29,6 @@ import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundException;
import org.apache.shardingsphere.scaling.core.job.JobProgress;
import org.apache.shardingsphere.scaling.core.job.ScalingJob;
-import org.apache.shardingsphere.scaling.core.job.task.incremental.IncrementalTaskProgress;
-import org.apache.shardingsphere.scaling.core.job.task.inventory.InventoryTaskProgress;
import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
@@ -120,13 +118,11 @@ public final class DistributedScalingJobServiceTest {
registryRepository.persist(ScalingTaskUtil.getScalingListenerPath("1/position/1/incremental"),
"{'ds2':{'filename':binlog1,'position':4,'delay':2},'ds4':{'filename':binlog2,'position':4,'delay':4}}");
JobProgress actual = scalingJobService.getProgress(1);
- assertThat(actual.getInventoryTaskProgress().get("0").stream()
- .map(each -> (InventoryTaskProgress) each)
- .filter(InventoryTaskProgress::isFinished).count(), is(2L));
- assertTrue(actual.getIncrementalTaskProgress().get("1").stream()
- .map(each -> (IncrementalTaskProgress) each)
- .filter(each -> "ds2".equals(each.getId()))
- .allMatch(each -> 2 == each.getDelayMillisecond()));
+ assertThat(actual.getInventoryTaskProgress().size(), is(2));
+ assertThat(actual.getIncrementalTaskProgress().size(), is(4));
+ assertThat(actual.getInventoryTaskProgress().get(0).getTotal(), is(5));
+ assertThat(actual.getInventoryTaskProgress().get(0).getFinished(), is(2));
+ assertThat(actual.getIncrementalTaskProgress().get(0).getDelayMillisecond(), is(1L));
}
@Test