You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/02 08:05:31 UTC

[shardingsphere] branch master updated: Improve consistency job finished check at pipeline E2E (#26009)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a3dc2ce4e1 Improve consistency job finished check at pipeline E2E (#26009)
3a3dc2ce4e1 is described below

commit 3a3dc2ce4e14fd394de103150c013b3cb4708759
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Jun 2 16:05:23 2023 +0800

    Improve consistency job finished check at pipeline E2E (#26009)
    
    * Wait consistency check job really finished at E2E
    
    * Use Awaitility replace Thread sleep
    
    * Improve E2E increment task finished check flag
---
 .../data/pipeline/cases/PipelineContainerComposer.java    |  9 +++------
 .../pipeline/cases/migration/AbstractMigrationE2EIT.java  | 15 ++++++++++-----
 .../migration/general/MySQLMigrationGeneralE2EIT.java     |  5 ++++-
 .../general/PostgreSQLMigrationGeneralE2EIT.java          | 13 ++++++++-----
 4 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 83bc0b6f8fa..bde28c399e8 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -377,13 +377,14 @@ public final class PipelineContainerComposer implements AutoCloseable {
      * @throws RuntimeException runtime exception
      */
     public List<Map<String, Object>> queryForListWithLog(final String sql) {
+        log.info("Query SQL: {}", sql);
         int retryNumber = 0;
         while (retryNumber <= 3) {
             try (Connection connection = proxyDataSource.getConnection()) {
                 ResultSet resultSet = connection.createStatement().executeQuery(sql);
                 return transformResultSetToList(resultSet);
             } catch (final SQLException ex) {
-                log.error("Data access error.", ex);
+                log.error("Data access error: ", ex);
             }
             Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
             retryNumber++;
@@ -427,13 +428,9 @@ public final class PipelineContainerComposer implements AutoCloseable {
      *
      * @param distSQL dist SQL
      * @return result
-     * @throws InterruptedException interrupted exception
      */
     // TODO use DAO to query via DistSQL
-    public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) throws InterruptedException {
-        if (null != increaseTaskThread) {
-            TimeUnit.SECONDS.timedJoin(increaseTaskThread, 30);
-        }
+    public List<Map<String, Object>> waitIncrementTaskFinished(final String distSQL) {
         for (int i = 0; i < 10; i++) {
             List<Map<String, Object>> listJobStatus = queryForListWithLog(distSQL);
             log.info("show status result: {}", listJobStatus);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a6554ee7580..eecce00fac5 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -97,7 +98,8 @@ public abstract class AbstractMigrationE2EIT {
     }
     
     protected void createTargetOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 2);
+        containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 0);
+        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
     }
     
     protected void createTargetOrderTableEncryptRule(final PipelineContainerComposer containerComposer) throws SQLException {
@@ -105,7 +107,9 @@ public abstract class AbstractMigrationE2EIT {
     }
     
     protected void createTargetOrderItemTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 2);
+        containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 0);
+        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+                .until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order_item").isEmpty());
     }
     
     protected void startMigration(final PipelineContainerComposer containerComposer, final String sourceTableName, final String targetTableName) throws SQLException {
@@ -146,17 +150,18 @@ public abstract class AbstractMigrationE2EIT {
         containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
         // TODO Need to add after the stop then to start, can continue the consistency check from the previous progress
         List<Map<String, Object>> resultList = Collections.emptyList();
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < 30; i++) {
             resultList = containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK STATUS '%s'", jobId));
             if (resultList.isEmpty()) {
                 Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
                 continue;
             }
             List<String> checkEndTimeList = resultList.stream().map(map -> map.get("check_end_time").toString()).filter(each -> !Strings.isNullOrEmpty(each)).collect(Collectors.toList());
-            if (checkEndTimeList.size() == resultList.size()) {
+            Set<String> finishedPercentages = resultList.stream().map(map -> map.get("finished_percentage").toString()).collect(Collectors.toSet());
+            if (checkEndTimeList.size() == resultList.size() && 1 == finishedPercentages.size() && finishedPercentages.contains("100")) {
                 break;
             } else {
-                Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
+                Awaitility.await().pollDelay(1L, TimeUnit.SECONDS).until(() -> true);
             }
         }
         log.info("check job results: {}", resultList);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index bddf7db7799..52703047d55 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -79,6 +79,9 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", orderJobId));
             containerComposer.startIncrementTask(
                     new E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
+            TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+            containerComposer.assertProxyOrderRecordExist("t_order", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, "DATA_MATCH");
             String orderItemJobId = getJobIdByTableName(containerComposer, "ds_0.t_order_item");
             assertMigrationSuccessById(containerComposer, orderItemJobId, "DATA_MATCH");
@@ -94,7 +97,7 @@ class MySQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         }
     }
     
-    private void assertMigrationSuccessById(final PipelineContainerComposer containerComposer, final String jobId, final String algorithmType) throws SQLException, InterruptedException {
+    private void assertMigrationSuccessById(final PipelineContainerComposer containerComposer, final String jobId, final String algorithmType) throws SQLException {
         List<Map<String, Object>> jobStatus = containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         for (Map<String, Object> each : jobStatus) {
             assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 0);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index da030243900..4b79eca1492 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -79,9 +79,12 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." + SOURCE_TABLE_NAME);
             containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-            containerComposer.startIncrementTask(new E2EIncrementalTask(
-                    containerComposer.getSourceDataSource(), String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
-                    new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
+            String schemaTableName = String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
+            containerComposer.startIncrementTask(new E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName, new SnowflakeKeyGenerateAlgorithm(),
+                    containerComposer.getDatabaseType(), 20));
+            TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+            containerComposer.assertProxyOrderRecordExist(schemaTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             checkOrderItemMigration(containerComposer);
             for (String each : listJobId(containerComposer)) {
@@ -94,7 +97,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         }
     }
     
-    private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException, InterruptedException {
+    private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException {
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         stopMigrationByJobId(containerComposer, jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
@@ -109,7 +112,7 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
-    private void checkOrderItemMigration(final PipelineContainerComposer containerComposer) throws SQLException, InterruptedException {
+    private void checkOrderItemMigration(final PipelineContainerComposer containerComposer) throws SQLException {
         startMigrationWithSchema(containerComposer, "t_order_item", "t_order_item");
         String jobId = getJobIdByTableName(containerComposer, "ds_0.test.t_order_item");
         containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));