You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/15 11:49:13 UTC

[shardingsphere] branch master updated: Add check job increment task finished method (#19240)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f982f3fdcd2 Add check job increment task finished method (#19240)
f982f3fdcd2 is described below

commit f982f3fdcd2d2fc43679e27c4b1be8d077566c12
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Fri Jul 15 19:49:00 2022 +0800

    Add check job increment task finished method (#19240)
---
 .../data/pipeline/cases/base/BaseITCase.java       | 32 ++++++++++++++++++++--
 .../cases/general/MySQLGeneralScalingIT.java       |  2 +-
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 10d9282249b..8fcb553abbe 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -342,10 +342,9 @@ public abstract class BaseITCase {
         }
         log.info("jobId: {}", jobId);
         Map<String, String> actualStatusMap = new HashMap<>(2, 1);
-        String showScalingStatus = String.format("SHOW SCALING STATUS %s", jobId);
         for (int i = 0; i < 15; i++) {
-            List<Map<String, Object>> showScalingStatusResMap = queryForListWithLog(showScalingStatus);
-            log.info("{}: {}", showScalingStatus, showScalingStatusResMap);
+            List<Map<String, Object>> showScalingStatusResMap = showScalingStatus(jobId);
+            log.info("show scaling status result: {}", showScalingStatusResMap);
             boolean finished = true;
             for (Map<String, Object> entry : showScalingStatusResMap) {
                 String status = entry.get("status").toString();
@@ -368,7 +367,19 @@ public abstract class BaseITCase {
         assertThat(actualStatusMap.values().stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet()), is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
     }
     
+    protected List<Map<String, Object>> showScalingStatus(final String jobId) {
+        return queryForListWithLog(String.format("SHOW SCALING STATUS %s", jobId));
+    }
+    
     protected void assertCheckScalingSuccess(final String jobId) {
+        for (int i = 0; i < 3; i++) {
+            if (checkJobIncrementTaskFinished(jobId)) {
+                break;
+            }
+            ThreadUtil.sleep(10, TimeUnit.SECONDS);
+        }
+        boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
+        log.info("second check job result: {}", secondCheckJobResult);
         stopScalingSourceWriting(jobId);
         assertStopScalingSourceWriting();
         List<Map<String, Object>> checkScalingResults = queryForListWithLog(String.format("CHECK SCALING %s BY TYPE (NAME=DATA_MATCH)", jobId));
@@ -378,6 +389,21 @@ public abstract class BaseITCase {
         }
     }
     
+    private boolean checkJobIncrementTaskFinished(final String jobId) {
+        List<Map<String, Object>> listScalingStatus = showScalingStatus(jobId);
+        log.info("listScalingStatus result: {}", listScalingStatus);
+        for (Map<String, Object> entry : listScalingStatus) {
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString())) {
+                return false;
+            }
+            int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
+            if (incrementalIdleSeconds <= 10) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
     protected void assertPreviewTableSuccess(final String tableName, final List<String> expect) {
         List<Map<String, Object>> actualResults = queryForListWithLog(String.format("PREVIEW SELECT COUNT(1) FROM %s", tableName));
         List<String> dataSourceNames = actualResults.stream().map(each -> String.valueOf(each.get("data_source_name"))).sorted().collect(Collectors.toList());
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
index e91e0120822..1c6a3067caf 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
@@ -90,7 +90,7 @@ public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
         waitScalingFinished(jobId);
         stopScaling(jobId);
         // TODO need netty leak fixed
-//        getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling");
+        getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling");
         startScaling(jobId);
         assertCheckScalingSuccess(jobId);
         applyScaling(jobId);