You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/09 02:04:12 UTC

[shardingsphere] branch master updated: Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a1dc830da95 Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)
a1dc830da95 is described below

commit a1dc830da956ea22a88745e327fef5abbbdc7117
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Nov 9 10:04:06 2022 +0800

    Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)
    
    * Improve migration IT
    
    * Fix codestyle
    
    * Fix
---
 .../data/pipeline/cases/base/BaseITCase.java       | 25 ++++++++++++++--------
 .../cases/migration/AbstractMigrationITCase.java   |  6 +++---
 .../migration/general/MySQLMigrationGeneralIT.java | 11 +++++-----
 .../general/PostgreSQLMigrationGeneralIT.java      |  8 +++----
 .../framework/helper/ScalingCaseHelper.java        | 13 +++++------
 5 files changed, 34 insertions(+), 29 deletions(-)

diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 4800822b4a7..b15e5ddb784 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -22,7 +22,6 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
@@ -62,6 +61,7 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -225,6 +225,16 @@ public abstract class BaseITCase {
         ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
     }
     
+    protected void waitJobPrepareSuccess(final String distSQL) {
+        for (int i = 0; i < 5; i++) {
+            List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
+            Set<String> statusSet = jobStatus.stream().map(each -> String.valueOf(each.get("status"))).collect(Collectors.toSet());
+            if (statusSet.contains(JobStatus.PREPARING.name()) || statusSet.contains(JobStatus.RUNNING.name())) {
+                ThreadUtil.sleep(2, TimeUnit.SECONDS);
+            }
+        }
+    }
+    
     protected void connectionExecuteWithLog(final Connection connection, final String sql) throws SQLException {
         log.info("connection execute:{}", sql);
         connection.createStatement().execute(sql);
@@ -272,19 +282,20 @@ public abstract class BaseITCase {
         if (null != getIncreaseTaskThread()) {
             TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
         }
-        for (int i = 0; i < 15; i++) {
+        for (int i = 0; i < 10; i++) {
             List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
             log.info("show status result: {}", listJobStatus);
             Set<String> actualStatus = new HashSet<>();
             List<Integer> incrementalIdleSecondsList = new ArrayList<>();
             for (Map<String, Object> each : listJobStatus) {
-                assertTrue(Strings.isNullOrEmpty(each.get("error_message").toString()));
+                assertTrue("error_message is not null", Strings.isNullOrEmpty(each.get("error_message").toString()));
                 actualStatus.add(each.get("status").toString());
                 String incrementalIdleSeconds = each.get("incremental_idle_seconds").toString();
                 incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 0 : Integer.parseInt(incrementalIdleSeconds));
             }
-            assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
-                    JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            assertFalse("status is JobStatus.PREPARING_FAILURE", actualStatus.contains(JobStatus.PREPARING_FAILURE.name()));
+            assertFalse("status is JobStatus.EXECUTE_INVENTORY_TASK_FAILURE", actualStatus.contains(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
+            assertFalse("status is JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE", actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
             if (Collections.min(incrementalIdleSecondsList) <= 5) {
                 ThreadUtil.sleep(3, TimeUnit.SECONDS);
                 continue;
@@ -292,10 +303,6 @@ public abstract class BaseITCase {
             if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
                 return listJobStatus;
             }
-            if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
-                log.warn("one of the shardingItem was not started correctly");
-            }
-            ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
         return Collections.emptyList();
     }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index fdfcc22959d..b07b5cf2296 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -175,9 +175,9 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
             ThreadUtil.sleep(3, TimeUnit.SECONDS);
         }
         log.info("check job results: {}", resultList);
-        for (Map<String, Object> entry : resultList) {
-            assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
-            assertThat(entry.get("finished_percentage").toString(), is("100"));
+        for (Map<String, Object> each : resultList) {
+            assertTrue("check result is false", Boolean.parseBoolean(each.get("result").toString()));
+            assertThat("finished_percentage is not 100", each.get("finished_percentage").toString(), is("100"));
         }
     }
 }
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 401fae91a1f..24276984233 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.task.MySQLIncre
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -86,20 +86,19 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
         createTargetOrderTableRule();
         createTargetOrderTableEncryptRule();
         createTargetOrderItemTableRule();
-        AutoIncrementKeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
         JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
-        Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 3000);
+        Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(parameterized.getDatabaseType(), 3000);
         log.info("init data begin: {}", LocalDateTime.now());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
         log.info("init data end: {}", LocalDateTime.now());
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         startMigration("t_order_item", "t_order_item");
-        // TODO wait preparation done (binlog position got), else insert/update/delete might be consumed in inventory task
-        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), keyGenerateAlgorithm, 30));
         String orderJobId = getJobIdByTableName(getSourceTableOrderName());
-        String orderItemJobId = getJobIdByTableName("t_order_item");
+        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
+        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 30));
         assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+        String orderItemJobId = getJobIdByTableName("t_order_item");
         assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
         assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
         for (String each : listJobId()) {
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index f4ab9cada7d..54de9982287 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.task.PostgreSQL
 import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
 import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -49,8 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Slf4j
 public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase {
     
-    private static final AutoIncrementKeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
-    
     private final ScalingParameterized parameterized;
     
     public PostgreSQLMigrationGeneralIT(final ScalingParameterized parameterized) {
@@ -91,7 +89,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         addMigrationTargetResource();
         createTargetOrderTableRule();
         createTargetOrderItemTableRule();
-        Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
+        Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
         JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
         log.info("init data begin: {}", LocalDateTime.now());
         jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
@@ -113,7 +111,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
         String jobId = getJobIdByTableName(getSourceTableOrderName());
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(jobId);
-        Comparable<?> recordId = KEY_GENERATE_ALGORITHM.generateKey();
+        long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
         sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", String.join(".", SCHEMA_NAME, getSourceTableOrderName()), recordId, 1, "afterStop"));
         startMigrationByJobId(jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index 8742fc131a7..7e98234e7b3 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -50,22 +50,23 @@ public final class ScalingCaseHelper {
     }
     
     /**
-     * Generate MySQL insert data, contains full fields.
+     * Generate insert data, contains full fields.
      *
-     * @param orderIdGenerate order id generate algorithm
      * @param databaseType database type
      * @param insertRows insert rows
      * @return insert data list
      */
-    public static Pair<List<Object[]>, List<Object[]>> generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate, final DatabaseType databaseType, final int insertRows) {
+    public static Pair<List<Object[]>, List<Object[]>> generateFullInsertData(final DatabaseType databaseType, final int insertRows) {
         if (insertRows < 0) {
             return Pair.of(null, null);
         }
+        AutoIncrementKeyGenerateAlgorithm orderKeyGenerate = new AutoIncrementKeyGenerateAlgorithm();
+        AutoIncrementKeyGenerateAlgorithm orderItemKeyGenerate = new AutoIncrementKeyGenerateAlgorithm();
         List<Object[]> orderData = new ArrayList<>(insertRows);
         List<Object[]> orderItemData = new ArrayList<>(insertRows);
         for (int i = 0; i < insertRows; i++) {
-            Comparable<?> orderId = orderIdGenerate.generateKey();
-            int userId = orderIdGenerate.generateKey();
+            int orderId = orderKeyGenerate.generateKey();
+            int userId = generateInt(0, 6);
             LocalDateTime now = LocalDateTime.now();
             int randomInt = generateInt(-100, 100);
             int randomUnsignedInt = generateInt(0, 100);
@@ -80,7 +81,7 @@ public final class ScalingCaseHelper {
                         BigDecimal.valueOf(generateDouble(1, 100)), true, generateString(2), generateString(2), generateFloat(),
                         generateDouble(0, 1000), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now()});
             }
-            orderItemData.add(new Object[]{SNOWFLAKE_KEY_GENERATE_ALGORITHM.generateKey(), orderId, userId, "SUCCESS"});
+            orderItemData.add(new Object[]{orderItemKeyGenerate.generateKey(), orderId, userId, "SUCCESS"});
         }
         return Pair.of(orderData, orderItemData);
     }