You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/26 05:43:56 UTC
[shardingsphere] branch master updated: Simply migration IT method and fix occasional ci error. (#21171)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c91e146951a Simply migration IT method and fix occasional ci error. (#21171)
c91e146951a is described below
commit c91e146951a88d282d4da482c22ea3c8f8254758
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Sep 26 13:43:48 2022 +0800
Simply migration IT method and fix occasional ci error. (#21171)
* Simply migration IT method.
* Decrease initialDelay at job persist service
---
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../data/pipeline/cases/base/BaseITCase.java | 19 ++++++++++------
.../cases/migration/AbstractMigrationITCase.java | 26 ----------------------
.../migration/general/MySQLMigrationGeneralIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralIT.java | 5 +++--
.../primarykey/TextPrimaryKeyMigrationIT.java | 2 +-
6 files changed, 18 insertions(+), 38 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 2fc2045a136..c57e946ccd0 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -47,7 +47,7 @@ public final class PipelineJobProgressPersistService {
private static final long DELAY_SECONDS = 1;
static {
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, DELAY_SECONDS, TimeUnit.SECONDS);
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 0, DELAY_SECONDS, TimeUnit.SECONDS);
}
/**
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 2a0f31d0f45..dae115c71c3 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -62,7 +62,6 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -271,19 +270,25 @@ public abstract class BaseITCase {
getIncreaseTaskThread().start();
}
- protected List<Map<String, Object>> waitJobFinished(final String distSQL) throws InterruptedException {
+ protected List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
if (null != getIncreaseTaskThread()) {
TimeUnit.SECONDS.timedJoin(getIncreaseTaskThread(), 60);
}
- Set<String> actualStatus;
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 15; i++) {
List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
log.info("show status result: {}", listJobStatus);
- actualStatus = listJobStatus.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
- assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
- JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ Set<String> actualStatus = new HashSet<>();
+ List<Integer> incrementalIdleSecondsList = new ArrayList<>();
for (Map<String, Object> each : listJobStatus) {
assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
+ actualStatus.add(each.get("status").toString());
+ incrementalIdleSecondsList.add(Integer.parseInt(each.get("incremental_idle_seconds").toString()));
+ }
+ assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
+ JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ if (Collections.min(incrementalIdleSecondsList) <= 5) {
+ ThreadUtil.sleep(3, TimeUnit.SECONDS);
+ continue;
}
if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
return listJobStatus;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
index 97aafb8cd7a..445fe1a5492 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/AbstractMigrationITCase.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.integration.data.pipeline.cases.migration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseITCase;
import org.apache.shardingsphere.integration.data.pipeline.command.MigrationDistSQLCommand;
import org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEnum;
@@ -33,7 +31,6 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
@@ -163,33 +160,10 @@ public abstract class AbstractMigrationITCase extends BaseITCase {
}
protected void assertCheckMigrationSuccess(final String jobId, final String algorithmType) {
- for (int i = 0; i < 5; i++) {
- if (checkJobIncrementTaskFinished(jobId)) {
- break;
- }
- ThreadUtil.sleep(3, TimeUnit.SECONDS);
- }
- boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
- log.info("second check job result: {}", secondCheckJobResult);
List<Map<String, Object>> checkJobResults = queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType));
log.info("check job results: {}", checkJobResults);
for (Map<String, Object> entry : checkJobResults) {
assertTrue(Boolean.parseBoolean(entry.get("records_content_matched").toString()));
}
}
-
- protected boolean checkJobIncrementTaskFinished(final String jobId) {
- List<Map<String, Object>> listJobStatus = queryForListWithLog(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- log.info("list job status result: {}", listJobStatus);
- for (Map<String, Object> entry : listJobStatus) {
- if (!JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString())) {
- return false;
- }
- int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
- if (incrementalIdleSeconds < 3) {
- return false;
- }
- }
- return true;
- }
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index e817099770b..0c4047c2286 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -112,7 +112,7 @@ public final class MySQLMigrationGeneralIT extends AbstractMigrationITCase {
}
private void assertMigrationSuccessById(final String jobId) throws SQLException, InterruptedException {
- List<Map<String, Object>> jobStatus = waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ List<Map<String, Object>> jobStatus = waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 582c4842de1..ef5b3421a8e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -114,11 +114,12 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
startMigrationWithSchema(getSourceTableOrderName(), "t_order");
startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, SCHEMA_NAME, getSourceTableOrderName(), 20));
String jobId = getJobIdByTableName(getSourceTableOrderName());
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(jobId);
sourceExecuteWithLog(String.format("INSERT INTO %s.%s (order_id,user_id,status) VALUES (%s, %s, '%s')", SCHEMA_NAME, getSourceTableOrderName(), KEY_GENERATE_ALGORITHM.generateKey(),
1, "afterStop"));
startMigrationByJobId(jobId);
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
@@ -126,7 +127,7 @@ public final class PostgreSQLMigrationGeneralIT extends AbstractMigrationITCase
private void checkOrderItemMigration() throws SQLException, InterruptedException {
startMigrationWithSchema("t_order_item", "t_order_item");
String jobId = getJobIdByTableName("t_order_item");
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
stopMigrationByJobId(jobId);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
index 45258013c6b..bd74d687c65 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationIT.java
@@ -89,8 +89,8 @@ public class TextPrimaryKeyMigrationIT extends AbstractMigrationITCase {
createTargetOrderTableRule();
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
- waitJobFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')", getSourceTableOrderName(), "1000000000", 1, "afterStop"));
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
// TODO The ordering of primary or unique keys for text types is different, but can't reproduce now
if (DatabaseTypeUtil.isMySQL(getDatabaseType())) {
assertCheckMigrationSuccess(jobId, "DATA_MATCH");