You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/26 05:43:56 UTC

[shardingsphere] branch master updated: Simply migration IT method and fix occasional ci error. (#21171)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c91e146951a Simply migration IT method and fix occasional ci error. (#21171)
c91e146951a is described below

commit c91e146951a88d282d4da482c22ea3c8f8254758
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Sep 26 13:43:48 2022 +0800

    Simply migration IT method and fix occasional ci error. (#21171)
    
    * Simply migration IT method.
    
    * Decrease initialDelay at job persist service
---
 .../persist/PipelineJobProgressPersistService.java |  2 +-
 .../data/pipeline/cases/base/BaseITCase.java       | 19 ++++++++++------
 .../cases/migration/AbstractMigrationITCase.java   | 26 ----------------------
 .../migration/general/MySQLMigrationGeneralIT.java |  2 +-
 .../general/PostgreSQLMigrationGeneralIT.java      |  5 +++--
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  2 +-
 6 files changed, 18 insertions(+), 38 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 2fc2045a136..c57e946ccd0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -47,7 +47,7 @@ public final class PipelineJobProgressPersistService {
     private static final long DELAY_SECONDS = 1;
     
     static {
-        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, DELAY_SECONDS, TimeUnit.SECONDS);
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 0, DELAY_SECONDS, TimeUnit.SECONDS);
     }
     
     /**
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 2a0f31d0f45..dae115c71c3 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -62,7 +62,6 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -271,19 +270,25 @@ public abstract class BaseITCase {
         getIncreaseTaskThread().start();
     }
     
-    protected List<Map<String, Object>> waitJobFinished(final String distSQL) throws InterruptedException {
+    protected List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
         if (null != getIncreaseTaskThread()) {
             TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
         }
-        Set<String> actualStatus;
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 15; i++) {
             List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
             log.info("show status result: {}", listJobStatus);
-            actualStatus = listJobStatus.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
-            assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
-                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            Set<String> actualStatus = new HashSet<>();
+            List<Integer> incrementalIdleSecondsList = new ArrayList<>();
             for (Map<String, Object> each : listJobStatus) {
                 assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
+                actualStatus.add(each.get("status").toString());
+                incrementalIdleSecondsList.add(Integer.parseInt(each.get("incremental_idle_seconds").toString()));
+            }
+            assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            if (Collections.min(incrementalIdleSecondsList) <= 5) {
+                ThreadUtil.sleep(3, TimeUnit.SECONDS);
+                continue;
             }
             if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
                 return listJobStatus;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 97aafb8cd7a..445fe1a5492 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
@@ -33,7 +31,6 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -163,33 +160,10 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
     }
     
     protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
-        for (int i = 0; i < 5; i++) {
-            if (checkJobIncrementTaskFinished(jobId)) {
-                break;
-            }
-            ThreadUtil.sleep(3, TimeUnit.SECONDS);
-        }
-        boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
-        log.info("second check job result: {}", secondCheckJobResult);
         List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
         log.info("check job results: {}", checkJobResults);
         for (Map<String, Object> entry : checkJobResults) {
             assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
         }
     }
-    
-    protected boolean checkJobIncrementTaskFinished(final String jobId) {
-        List<Map<String, Object>> listJobStatus = queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        log.info("list job status result: {}", listJobStatus);
-        for (Map<String, Object> entry : listJobStatus) {
-            if (!JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString())) {
-                return false;
-            }
-            int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
-            if (incrementalIdleSeconds < 3) {
-                return false;
-            }
-        }
-        return true;
-    }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index e817099770b..0c4047c2286 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -112,7 +112,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
     }
     
     private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
-        List<Map<String, Object>> jobStatus = waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
         }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 582c4842de1..ef5b3421a8e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -114,11 +114,12 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         startMigrationWithSchema(getSourceTableOrderName(), "t_order");
         startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), 20));
         String jobId = getJobIdByTableName(getSourceTableOrderName());
-        waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(jobId);
         sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
                 1, "afterStop"));
         startMigrationByJobId(jobId);
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
@@ -126,7 +127,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
     private void checkOrderItemMigration() throws SQLException, InterruptedException {
         startMigrationWithSchema("t_order_item", "t_order_item");
         String jobId = getJobIdByTableName("t_order_item");
-        waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
         stopMigrationByJobId(jobId);
     }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index 45258013c6b..bd74d687c65 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -89,8 +89,8 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
         createTargetOrderTableRule();
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
-        waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         // TODO The ordering of primary or unique keys for text types is different, but can't reproduce now
         if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
             assertCheckMigrationSuccess(jobId, "DATA_MATCH");