You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/15 11:49:13 UTC
[shardingsphere] branch master updated: Add check job increment task finished method (#19240)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f982f3fdcd2 Add check job increment task finished method (#19240)
f982f3fdcd2 is described below
commit f982f3fdcd2d2fc43679e27c4b1be8d077566c12
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Fri Jul 15 19:49:00 2022 +0800
Add check job increment task finished method (#19240)
---
.../data/pipeline/cases/base/BaseITCase.java | 32 ++++++++++++++++++++--
.../cases/general/MySQLGeneralScalingIT.java | 2 +-
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 10d9282249b..8fcb553abbe 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -342,10 +342,9 @@ public abstract class BaseITCase {
}
log.info("jobId: {}", jobId);
Map<String, String> actualStatusMap = new HashMap<>(2, 1);
- String showScalingStatus = String.format("SHOW SCALING STATUS %s", jobId);
for (int i = 0; i < 15; i++) {
- List<Map<String, Object>> showScalingStatusResMap = queryForListWithLog(showScalingStatus);
- log.info("{}: {}", showScalingStatus, showScalingStatusResMap);
+ List<Map<String, Object>> showScalingStatusResMap = showScalingStatus(jobId);
+ log.info("show scaling status result: {}", showScalingStatusResMap);
boolean finished = true;
for (Map<String, Object> entry : showScalingStatusResMap) {
String status = entry.get("status").toString();
@@ -368,7 +367,19 @@ public abstract class BaseITCase {
assertThat(actualStatusMap.values().stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet()), is(Collections.singleton(JobStatus.EXECUTE_INCREMENTAL_TASK.name())));
}
+ protected List<Map<String, Object>> showScalingStatus(final String jobId) {
+ return queryForListWithLog(String.format("SHOW SCALING STATUS %s", jobId));
+ }
+
protected void assertCheckScalingSuccess(final String jobId) {
+ for (int i = 0; i < 3; i++) {
+ if (checkJobIncrementTaskFinished(jobId)) {
+ break;
+ }
+ ThreadUtil.sleep(10, TimeUnit.SECONDS);
+ }
+ boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
+ log.info("second check job result: {}", secondCheckJobResult);
stopScalingSourceWriting(jobId);
assertStopScalingSourceWriting();
List<Map<String, Object>> checkScalingResults = queryForListWithLog(String.format("CHECK SCALING %s BY TYPE (NAME=DATA_MATCH)", jobId));
@@ -378,6 +389,21 @@ public abstract class BaseITCase {
}
}
+ private boolean checkJobIncrementTaskFinished(final String jobId) {
+ List<Map<String, Object>> listScalingStatus = showScalingStatus(jobId);
+ log.info("listScalingStatus result: {}", listScalingStatus);
+ for (Map<String, Object> entry : listScalingStatus) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK.name().equalsIgnoreCase(entry.get("status").toString())) {
+ return false;
+ }
+ int incrementalIdleSeconds = Integer.parseInt(entry.get("incremental_idle_seconds").toString());
+ if (incrementalIdleSeconds <= 10) {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected void assertPreviewTableSuccess(final String tableName, final List<String> expect) {
List<Map<String, Object>> actualResults = queryForListWithLog(String.format("PREVIEW SELECT COUNT(1) FROM %s", tableName));
List<String> dataSourceNames = actualResults.stream().map(each -> String.valueOf(each.get("data_source_name"))).sorted().collect(Collectors.toList());
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
index e91e0120822..1c6a3067caf 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLGeneralScalingIT.java
@@ -90,7 +90,7 @@ public final class MySQLGeneralScalingIT extends BaseExtraSQLITCase {
waitScalingFinished(jobId);
stopScaling(jobId);
// TODO need netty leak fixed
-// getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling");
+ getJdbcTemplate().update("INSERT INTO t_order (id,order_id,user_id,status) VALUES (?, ?, ?, ?)", keyGenerateAlgorithm.generateKey(), 1, 1, "afterStopScaling");
startScaling(jobId);
assertCheckScalingSuccess(jobId);
applyScaling(jobId);