You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/15 01:56:54 UTC
[shardingsphere] branch master updated: Refactor migration inventory finished percentage (#22171)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b088c3610a7 Refactor migration inventory finished percentage (#22171)
b088c3610a7 is described below
commit b088c3610a7f61370609aabd9b5ff13e038170aa
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Nov 15 09:56:37 2022 +0800
Refactor migration inventory finished percentage (#22171)
* Refactor migration inventory finished percentage
* Add IT
* Fix codestyle
* Update unit test
---
.../ShowMigrationJobStatusQueryResultSet.java | 2 +-
.../progress/JobItemInventoryTasksProgress.java | 21 +++----------
.../api/pojo/InventoryIncrementalJobItemInfo.java | 4 ++-
.../AbstractInventoryIncrementalJobAPIImpl.java | 17 +++++++----
.../migration/general/MySQLMigrationGeneralIT.java | 1 +
.../core/api/impl/MigrationJobAPIImplTest.java | 35 ++++++++++++++++++++++
.../InventoryIncrementalJobItemProgressTest.java | 11 -------
...ntoryIncrementalJobItemProgressSwapperTest.java | 2 --
8 files changed, 55 insertions(+), 38 deletions(-)
diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index fd22fe35d92..ac8944626a5 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -56,7 +56,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
result.add(jobItemProgress.getStatus());
result.add(jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
result.add(jobItemProgress.getProcessedRecordsCount());
- result.add(jobItemProgress.getInventory().getInventoryFinishedPercentage());
+ result.add(each.getInventoryFinishedPercentage());
String incrementalIdleSeconds = "";
if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
index b3ff1ad1598..7265108bc9a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
@@ -17,15 +17,15 @@
package org.apache.shardingsphere.data.pipeline.api.job.progress;
-import java.util.Map;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
/**
* Job item inventory tasks progress.
*/
@@ -47,17 +47,4 @@ public final class JobItemInventoryTasksProgress {
.filter(entry -> pattern.matcher(entry.getKey()).find())
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition()));
}
-
- /**
- * Get inventory finished percentage.
- *
- * @return finished percentage
- */
- public int getInventoryFinishedPercentage() {
- // TODO finished percentage is not accurate enough
- long finished = inventoryTaskProgressMap.values().stream()
- .filter(each -> each.getPosition() instanceof FinishedPosition)
- .count();
- return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 / inventoryTaskProgressMap.size());
- }
}
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 2521ef3652e..117cdcbaac3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
*/
@RequiredArgsConstructor
@Getter
-public final class InventoryIncrementalJobItemInfo {
+public class InventoryIncrementalJobItemInfo {
private final int shardingItem;
@@ -34,5 +34,7 @@ public final class InventoryIncrementalJobItemInfo {
private final long startTimeMillis;
+ private final int inventoryFinishedPercentage;
+
private final String errorMessage;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 525ba120cc3..d5f76459eb1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -50,10 +50,10 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -109,15 +109,20 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
- List<InventoryIncrementalJobItemInfo> result = new ArrayList<>(jobProgress.size());
+ List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
- String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
- InventoryIncrementalJobItemInfo progressInfo = new InventoryIncrementalJobItemInfo(shardingItem, entry.getValue(), startTimeMillis, errorMessage);
- if (null == entry.getValue()) {
+ InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
+ if (null == jobItemProgress) {
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem, null, startTimeMillis, 0, ""));
continue;
}
- result.add(progressInfo);
+ int inventoryFinishedPercentage = 0;
+ if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
+ inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
+ }
+ String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
}
return result;
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 24276984233..73f9a39aad4 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -113,6 +113,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
+ assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()), is(100));
}
assertCheckMigrationSuccess(jobId, algorithmType);
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 3817a5a8584..4f5de0fcd58 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
@@ -41,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsist
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -55,6 +58,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@@ -279,4 +283,35 @@ public final class MigrationJobAPIImplTest {
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));
}
+
+ @Test
+ public void assertGetJobItemInfosAtBegin() {
+ Optional<String> optional = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ assertTrue(optional.isPresent());
+ String jobId = optional.get();
+ YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress();
+ yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+ List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
+ assertThat(jobItemInfos.size(), is(1));
+ InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
+ assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING));
+ assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(0));
+ }
+
+ @Test
+ public void assertGetJobItemInfosAtIncrementTask() {
+ Optional<String> optional = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ assertTrue(optional.isPresent());
+ YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress();
+ yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
+ yamlJobItemProgress.setProcessedRecordsCount(100);
+ yamlJobItemProgress.setInventoryRecordsCount(50);
+ String jobId = optional.get();
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+ List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
+ InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
+ assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK));
+ assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));
+ }
}
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
index 4225dc42f82..16c4af34364 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -67,17 +67,6 @@ public final class InventoryIncrementalJobItemProgressTest {
assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_2"), instanceOf(IntegerPrimaryKeyPosition.class));
}
- @Test
- public void assertGetInventoryFinishedPercentage() {
- InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
- assertThat(actual.getInventory().getInventoryFinishedPercentage(), is(50));
- }
-
- @Test
- public void assertGetAllFinishedInventoryFinishedPercentage() {
- assertThat(getJobItemProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventory().getInventoryFinishedPercentage(), is(100));
- }
-
@Test
public void assertGetIncrementalLatestActiveTimeMillis() {
assertThat(getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index 13189d14201..958f982d86d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -59,7 +59,6 @@ public final class YamlInventoryIncrementalJobItemProgressSwapperTest {
InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress);
assertNotNull(progress.getInventory());
assertNotNull(progress.getIncremental());
- assertThat(progress.getInventory().getInventoryFinishedPercentage(), is(0));
assertThat(progress.getDataSourceName(), is("ds_0"));
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress);
@@ -74,7 +73,6 @@ public final class YamlInventoryIncrementalJobItemProgressSwapperTest {
InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress);
assertNotNull(progress.getInventory());
assertNotNull(progress.getIncremental());
- assertThat(progress.getInventory().getInventoryFinishedPercentage(), is(0));
assertThat(progress.getDataSourceName(), is("ds_0"));
assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress);