You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/09 02:04:12 UTC
[shardingsphere] branch master updated: Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a1dc830da95 Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)
a1dc830da95 is described below
commit a1dc830da956ea22a88745e327fef5abbbdc7117
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Nov 9 10:04:06 2022 +0800
Improve migration IT, use a separate key generate and add wait job prepare success check method (#22013)
* Improve migration IT
* Fix codestyle
* Fix
---
.../data/pipeline/cases/base/BaseITCase.java | 25 ++++++++++++++--------
.../cases/migration/AbstractMigrationITCase.java | 6 +++---
.../migration/general/MySQLMigrationGeneralIT.java | 11 +++++-----
.../general/PostgreSQLMigrationGeneralIT.java | 8 +++----
.../framework/helper/ScalingCaseHelper.java | 13 +++++------
5 files changed, 34 insertions(+), 29 deletions(-)
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 4800822b4a7..b15e5ddb784 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -22,7 +22,6 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
@@ -62,6 +61,7 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -225,6 +225,16 @@ public abstract class BaseITCase {
ThreadUtil.sleep(Math.max(sleepSeconds, 0), TimeUnit.SECONDS);
}
+ protected void waitJobPrepareSuccess(final String distSQL) {
+ for (int i = 0; i < 5; i++) {
+ List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
+ Set<String> statusSet = jobStatus.stream().map(each -> String.valueOf(each.get("status"))).collect(Collectors.toSet());
+ if (statusSet.contains(JobStatus.PREPARING.name()) || statusSet.contains(JobStatus.RUNNING.name())) {
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
+ }
+ }
+ }
+
protected void connectionExecuteWithLog(final Connection connection, final String sql) throws SQLException {
log.info("connection execute:{}", sql);
connection.createStatement().execute(sql);
@@ -272,19 +282,20 @@ public abstract class BaseITCase {
if (null != getIncreaseTaskThread()) {
TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
}
- for (int i = 0; i < 15; i++) {
+ for (int i = 0; i < 10; i++) {
List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
log.info("show status result: {}", listJobStatus);
Set<String> actualStatus = new HashSet<>();
List<Integer> incrementalIdleSecondsList = new ArrayList<>();
for (Map<String, Object> each : listJobStatus) {
- assertTrue(Strings.isNullOrEmpty(each.get("error_message").toString()));
+ assertTrue("error_message is not null", Strings.isNullOrEmpty(each.get("error_message").toString()));
actualStatus.add(each.get("status").toString());
String incrementalIdleSeconds = each.get("incremental_idle_seconds").toString();
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 0 : Integer.parseInt(incrementalIdleSeconds));
}
- assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
- JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ assertFalse("status is JobStatus.PREPARING_FAILURE", actualStatus.contains(JobStatus.PREPARING_FAILURE.name()));
+ assertFalse("status is JobStatus.EXECUTE_INVENTORY_TASK_FAILURE", actualStatus.contains(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()));
+ assertFalse("status is JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE", actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()));
if (Collections.min(incrementalIdleSecondsList) <= 5) {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
continue;
@@ -292,10 +303,6 @@ public abstract class BaseITCase {
if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
return listJobStatus;
}
- if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {
- log.warn("one of the shardingItem was not started correctly");
- }
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
return Collections.emptyList();
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index fdfcc22959d..b07b5cf2296 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -175,9 +175,9 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
ThreadUtil.sleep(3, TimeUnit.SECONDS);
}
log.info("check job results: {}", resultList);
- for (Map<String, Object> entry : resultList) {
- assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
- assertThat(entry.get("finished_percentage").toString(), is("100"));
+ for (Map<String, Object> each : resultList) {
+ assertTrue("check result is false", Boolean.parseBoolean(each.get("result").toString()));
+ assertThat("finished_percentage is not 100", each.get("finished_percentage").toString(), is("100"));
}
}
}
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 401fae91a1f..24276984233 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -25,7 +25,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.task.MySQLIncre
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -86,20 +86,19 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
createTargetOrderTableRule();
createTargetOrderTableEncryptRule();
createTargetOrderItemTableRule();
- AutoIncrementKeyGenerateAlgorithm keyGenerateAlgorithm = new AutoIncrementKeyGenerateAlgorithm();
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
- Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, parameterized.getDatabaseType(), 3000);
+ Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(parameterized.getDatabaseType(), 3000);
log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), dataPair.getRight());
log.info("init data end: {}", LocalDateTime.now());
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
startMigration("t_order_item", "t_order_item");
- // TODO wait preparation done (binlog position got), else insert/update/delete might be consumed in inventory task
- startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), keyGenerateAlgorithm, 30));
String orderJobId = getJobIdByTableName(getSourceTableOrderName());
- String orderItemJobId = getJobIdByTableName("t_order_item");
+ waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
+ startIncrementTask(new MySQLIncrementTask(jdbcTemplate, getSourceTableOrderName(), new SnowflakeKeyGenerateAlgorithm(), 30));
assertMigrationSuccessById(orderJobId, "DATA_MATCH");
+ String orderItemJobId = getJobIdByTableName("t_order_item");
assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
for (String each : listJobId()) {
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index f4ab9cada7d..54de9982287 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.integration.data.pipeline.cases.task.PostgreSQL
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -49,8 +49,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Slf4j
public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase {
- private static final AutoIncrementKeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
-
private final ScalingParameterized parameterized;
public PostgreSQLMigrationGeneralIT(final ScalingParameterized parameterized) {
@@ -91,7 +89,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
addMigrationTargetResource();
createTargetOrderTableRule();
createTargetOrderItemTableRule();
- Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(KEY_GENERATE_ALGORITHM, parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
+ Pair<List<Object[]>, List<Object[]>> dataPair = ScalingCaseHelper.generateFullInsertData(parameterized.getDatabaseType(), TABLE_INIT_ROW_COUNT);
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
log.info("init data begin: {}", LocalDateTime.now());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(getSourceTableOrderName()), dataPair.getLeft());
@@ -113,7 +111,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
String jobId = getJobIdByTableName(getSourceTableOrderName());
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(jobId);
- Comparable<?> recordId = KEY_GENERATE_ALGORITHM.generateKey();
+ long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", String.join(".", SCHEMA_NAME, getSourceTableOrderName()), recordId, 1, "afterStop"));
startMigrationByJobId(jobId);
// must refresh firstly, otherwise proxy can't get schema and table info
diff --git a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index 8742fc131a7..7e98234e7b3 100644
--- a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++ b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -50,22 +50,23 @@ public final class ScalingCaseHelper {
}
/**
- * Generate MySQL insert data, contains full fields.
+ * Generate insert data, contains full fields.
*
- * @param orderIdGenerate order id generate algorithm
* @param databaseType database type
* @param insertRows insert rows
* @return insert data list
*/
- public static Pair<List<Object[]>, List<Object[]>> generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate, final DatabaseType databaseType, final int insertRows) {
+ public static Pair<List<Object[]>, List<Object[]>> generateFullInsertData(final DatabaseType databaseType, final int insertRows) {
if (insertRows < 0) {
return Pair.of(null, null);
}
+ AutoIncrementKeyGenerateAlgorithm orderKeyGenerate = new AutoIncrementKeyGenerateAlgorithm();
+ AutoIncrementKeyGenerateAlgorithm orderItemKeyGenerate = new AutoIncrementKeyGenerateAlgorithm();
List<Object[]> orderData = new ArrayList<>(insertRows);
List<Object[]> orderItemData = new ArrayList<>(insertRows);
for (int i = 0; i < insertRows; i++) {
- Comparable<?> orderId = orderIdGenerate.generateKey();
- int userId = orderIdGenerate.generateKey();
+ int orderId = orderKeyGenerate.generateKey();
+ int userId = generateInt(0, 6);
LocalDateTime now = LocalDateTime.now();
int randomInt = generateInt(-100, 100);
int randomUnsignedInt = generateInt(0, 100);
@@ -80,7 +81,7 @@ public final class ScalingCaseHelper {
BigDecimal.valueOf(generateDouble(1, 100)), true, generateString(2), generateString(2), generateFloat(),
generateDouble(0, 1000), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now()});
}
- orderItemData.add(new Object[]{SNOWFLAKE_KEY_GENERATE_ALGORITHM.generateKey(), orderId, userId, "SUCCESS"});
+ orderItemData.add(new Object[]{orderItemKeyGenerate.generateKey(), orderId, userId, "SUCCESS"});
}
return Pair.of(orderData, orderItemData);
}