You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/15 01:56:54 UTC

[shardingsphere] branch master updated: Refactor migration inventory finished percentage (#22171)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b088c3610a7 Refactor migration inventory finished percentage (#22171)
b088c3610a7 is described below

commit b088c3610a7f61370609aabd9b5ff13e038170aa
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Nov 15 09:56:37 2022 +0800

    Refactor migration inventory finished percentage (#22171)
    
    * Refactor migration inventory finished percentage
    
    * Add IT
    
    * Fix codestyle
    
    * Update unit test
---
 .../ShowMigrationJobStatusQueryResultSet.java      |  2 +-
 .../progress/JobItemInventoryTasksProgress.java    | 21 +++----------
 .../api/pojo/InventoryIncrementalJobItemInfo.java  |  4 ++-
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 17 +++++++----
 .../migration/general/MySQLMigrationGeneralIT.java |  1 +
 .../core/api/impl/MigrationJobAPIImplTest.java     | 35 ++++++++++++++++++++++
 .../InventoryIncrementalJobItemProgressTest.java   | 11 -------
 ...ntoryIncrementalJobItemProgressSwapperTest.java |  2 --
 8 files changed, 55 insertions(+), 38 deletions(-)

diff --git a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index fd22fe35d92..ac8944626a5 100644
--- a/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/features/sharding/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -56,7 +56,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                 result.add(jobItemProgress.getStatus());
                 result.add(jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString());
                 result.add(jobItemProgress.getProcessedRecordsCount());
-                result.add(jobItemProgress.getInventory().getInventoryFinishedPercentage());
+                result.add(each.getInventoryFinishedPercentage());
                 String incrementalIdleSeconds = "";
                 if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
                     long latestActiveTimeMillis = Math.max(each.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
index b3ff1ad1598..7265108bc9a 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemInventoryTasksProgress.java
@@ -17,15 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.api.job.progress;
 
-import java.util.Map;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 /**
  * Job item inventory tasks progress.
  */
@@ -47,17 +47,4 @@ public final class JobItemInventoryTasksProgress {
                 .filter(entry -> pattern.matcher(entry.getKey()).find())
                 .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getPosition()));
     }
-    
-    /**
-     * Get inventory finished percentage.
-     *
-     * @return finished percentage
-     */
-    public int getInventoryFinishedPercentage() {
-        // TODO finished percentage is not accurate enough
-        long finished = inventoryTaskProgressMap.values().stream()
-                .filter(each -> each.getPosition() instanceof FinishedPosition)
-                .count();
-        return inventoryTaskProgressMap.isEmpty() ? 0 : (int) (finished * 100 / inventoryTaskProgressMap.size());
-    }
 }
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 2521ef3652e..117cdcbaac3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
  */
 @RequiredArgsConstructor
 @Getter
-public final class InventoryIncrementalJobItemInfo {
+public class InventoryIncrementalJobItemInfo {
     
     private final int shardingItem;
     
@@ -34,5 +34,7 @@ public final class InventoryIncrementalJobItemInfo {
     
     private final long startTimeMillis;
     
+    private final int inventoryFinishedPercentage;
+    
     private final String errorMessage;
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 525ba120cc3..d5f76459eb1 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -50,10 +50,10 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -109,15 +109,20 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
-        List<InventoryIncrementalJobItemInfo> result = new ArrayList<>(jobProgress.size());
+        List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
-            String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
-            InventoryIncrementalJobItemInfo progressInfo = new InventoryIncrementalJobItemInfo(shardingItem, entry.getValue(), startTimeMillis, errorMessage);
-            if (null == entry.getValue()) {
+            InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
+            if (null == jobItemProgress) {
+                result.add(new InventoryIncrementalJobItemInfo(shardingItem, null, startTimeMillis, 0, ""));
                 continue;
             }
-            result.add(progressInfo);
+            int inventoryFinishedPercentage = 0;
+            if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
+                inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
+            }
+            String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
+            result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
         }
         return result;
     }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 24276984233..73f9a39aad4 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -113,6 +113,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
         List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
+            assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()), is(100));
         }
         assertCheckMigrationSuccess(jobId, algorithmType);
     }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index 3817a5a8584..4f5de0fcd58 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -29,8 +29,10 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
@@ -41,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsist
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,6 +58,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -279,4 +283,35 @@ public final class MigrationJobAPIImplTest {
         Collection<Object> objects = actual.iterator().next();
         assertThat(objects.toArray()[0], is("ds_0"));
     }
+    
+    @Test
+    public void assertGetJobItemInfosAtBegin() {
+        Optional<String> optional = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        assertTrue(optional.isPresent());
+        String jobId = optional.get();
+        YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress();
+        yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
+        assertThat(jobItemInfos.size(), is(1));
+        InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
+        assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING));
+        assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(0));
+    }
+    
+    @Test
+    public void assertGetJobItemInfosAtIncrementTask() {
+        Optional<String> optional = jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+        assertTrue(optional.isPresent());
+        YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new YamlInventoryIncrementalJobItemProgress();
+        yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name());
+        yamlJobItemProgress.setProcessedRecordsCount(100);
+        yamlJobItemProgress.setInventoryRecordsCount(50);
+        String jobId = optional.get();
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 0, YamlEngine.marshal(yamlJobItemProgress));
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = jobAPI.getJobItemInfos(jobId);
+        InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
+        assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK));
+        assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));
+    }
 }
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
index 4225dc42f82..16c4af34364 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/InventoryIncrementalJobItemProgressTest.java
@@ -67,17 +67,6 @@ public final class InventoryIncrementalJobItemProgressTest {
         assertThat(actual.getInventory().getInventoryPosition("ds1").get("ds1.t_2"), instanceOf(IntegerPrimaryKeyPosition.class));
     }
     
-    @Test
-    public void assertGetInventoryFinishedPercentage() {
-        InventoryIncrementalJobItemProgress actual = getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
-        assertThat(actual.getInventory().getInventoryFinishedPercentage(), is(50));
-    }
-    
-    @Test
-    public void assertGetAllFinishedInventoryFinishedPercentage() {
-        assertThat(getJobItemProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventory().getInventoryFinishedPercentage(), is(100));
-    }
-    
     @Test
     public void assertGetIncrementalLatestActiveTimeMillis() {
         assertThat(getJobItemProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
diff --git a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
index 13189d14201..958f982d86d 100644
--- a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
+++ b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapperTest.java
@@ -59,7 +59,6 @@ public final class YamlInventoryIncrementalJobItemProgressSwapperTest {
         InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress);
         assertNotNull(progress.getInventory());
         assertNotNull(progress.getIncremental());
-        assertThat(progress.getInventory().getInventoryFinishedPercentage(), is(0));
         assertThat(progress.getDataSourceName(), is("ds_0"));
         assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
         YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress);
@@ -74,7 +73,6 @@ public final class YamlInventoryIncrementalJobItemProgressSwapperTest {
         InventoryIncrementalJobItemProgress progress = SWAPPER.swapToObject(yamlProgress);
         assertNotNull(progress.getInventory());
         assertNotNull(progress.getIncremental());
-        assertThat(progress.getInventory().getInventoryFinishedPercentage(), is(0));
         assertThat(progress.getDataSourceName(), is("ds_0"));
         assertThat(progress.getIncremental().getIncrementalLatestActiveTimeMillis(), is(0L));
         YamlInventoryIncrementalJobItemProgress actual = SWAPPER.swapToYamlConfiguration(progress);