You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/02 08:05:31 UTC
[shardingsphere] branch master updated: Improve consistency job finished check at pipeline E2E (#26009)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3dc2ce4e1 Improve consistency job finished check at pipeline E2E (#26009)
3a3dc2ce4e1 is described below
commit 3a3dc2ce4e14fd394de103150c013b3cb4708759
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Jun 2 16:05:23 2023 +0800
Improve consistency job finished check at pipeline E2E (#26009)
* Wait consistency check job really finished at E2E
* Use Awaitility replace Thread sleep
* Improve E2E increment task finished check flag
---
.../data/pipeline/cases/PipelineContainerComposer.java | 9 +++------
.../pipeline/cases/migration/AbstractMigrationE2EIT.java | 15 ++++++++++-----
.../migration/general/MySQLMigrationGeneralE2EIT.java | 5 ++++-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 13 ++++++++-----
4 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 83bc0b6f8fa..bde28c399e8 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -377,13 +377,14 @@ public final class PipelineContainerComposer implements AutoCloseable {
* @throws RuntimeException runtime exception
*/
public List<Map<String, Object>> queryForListWithLog(final String sql) {
+ log.info("Query SQL: {}", sql);
int retryNumber = 0;
while (retryNumber <= 3) {
try (Connection connection = proxyDataSource.getConnection()) {
ResultSet resultSet = connection.createStatement().executeQuery(sql);
return transformResultSetToList(resultSet);
} catch (final SQLException ex) {
- log.error("Data access error.", ex);
+ log.error("Data access error: ", ex);
}
Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
retryNumber++;
@@ -427,13 +428,9 @@ public final class PipelineContainerComposer implements AutoCloseable {
*
* @param distSQL dist SQL
* @return result
- * @throws InterruptedException interrupted exception
*/
// TODO use DAO to query via DistSQL
- public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
- if (null != increaseTaskThread) {
- TimeUnit.SECONDS.timedJoin(increaseTaskThread, 30);
- }
+ public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) {
for (int i = 0; i < 10; i++) {
List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
log.info("show status result: {}", listJobStatus);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a6554ee7580..eecce00fac5 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -97,7 +98,8 @@ public abstract class AbstractMigrationE2EIT {
}
protected void createTargetOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
- containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 2);
+ containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 0);
+ Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
}
protected void createTargetOrderTableEncryptRule(final PipelineContainerComposer containerComposer) throws SQLException {
@@ -105,7 +107,9 @@ public abstract class AbstractMigrationE2EIT {
}
protected void createTargetOrderItemTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
- containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 2);
+ containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 0);
+ Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+ .until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order_item").isEmpty());
}
protected void startMigration(final PipelineContainerComposer containerComposer, final String sourceTableName, final String targetTableName) throws SQLException {
@@ -146,17 +150,18 @@ public abstract class AbstractMigrationE2EIT {
containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
// TODO Need to add after the stop then to start, can continue the consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 30; i++) {
resultList = containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
if (resultList.isEmpty()) {
Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
continue;
}
List<String> checkEndTimeList = resultList.stream().map(map -> map.get("check_end_time").toString()).filter(each -> !Strings.isNullOrEmpty(each)).collect(Collectors.toList());
- if (checkEndTimeList.size() == resultList.size()) {
+ Set<String> finishedPercentages = resultList.stream().map(map -> map.get("finished_percentage").toString()).collect(Collectors.toSet());
+ if (checkEndTimeList.size() == resultList.size() && 1 == finishedPercentages.size() && finishedPercentages.contains("100")) {
break;
} else {
- Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
+ Awaitility.await().pollDelay(1L, TimeUnit.SECONDS).until(() -> true);
}
}
log.info("check job results: {}", resultList);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index bddf7db7799..52703047d55 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -79,6 +79,9 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
containerComposer.startIncrementTask(
new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
+ TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+ containerComposer.assertProxyOrderRecordExist("t_order", 10000);
assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH");
@@ -94,7 +97,7 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
}
- private void assertMigrationSuccessById(final PipelineContainerComposer containerComposer, final String jobId, final String algorithmType) throws SQLException, InterruptedException {
+ private void assertMigrationSuccessById(final PipelineContainerComposer containerComposer, final String jobId, final String algorithmType) throws SQLException {
List<Map<String, Object>> jobStatus = containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index da030243900..4b79eca1492 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -79,9 +79,12 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
String jobId = getJobIdByTableName(containerComposer, "ds_0.test." + SOURCE_TABLE_NAME);
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- containerComposer.startIncrementTask(new E2EIncrementalTask(
- containerComposer.getSourceDataSource(), String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
- new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+ String schemaTableName = String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
+ containerComposer.startIncrementTask(new E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName, new SnowflakeKeyGenerateAlgorithm(),
+ containerComposer.getDatabaseType(), 20));
+ TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+ containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
checkOrderMigration(containerComposer, jobId);
checkOrderItemMigration(containerComposer);
for (String each : listJobId(containerComposer)) {
@@ -94,7 +97,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
}
}
- private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException, InterruptedException {
+ private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
// must refresh firstly, otherwise proxy can't get schema and table info
@@ -109,7 +112,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
- private void checkOrderItemMigration(final PipelineContainerComposer containerComposer) throws SQLException, InterruptedException {
+ private void checkOrderItemMigration(final PipelineContainerComposer containerComposer) throws SQLException {
startMigrationWithSchema(containerComposer, "t_order_item", "t_order_item");
String jobId = getJobIdByTableName(containerComposer, "ds_0.test.t_order_item");
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));