You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/18 03:52:13 UTC
[shardingsphere] branch master updated: Add second level threshold for IDLE completion detect algorithm (#15480)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4335a55 Add second level threshold for IDLE completion detect algorithm (#15480)
4335a55 is described below
commit 4335a55ef5e73dd011960caf3d9f82bf41ae7c78
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Feb 18 11:51:11 2022 +0800
Add second level threshold for IDLE completion detect algorithm (#15480)
* Refactor PipelineSimpleLock
* Increase FinishedCheckJob execution frequency
* Add incremental-task-idle-second-threshold for IDLE completion detector
* Change scaling status column name to incremental_idle_seconds
* Increase job progress persist frequency
---
.../user-manual/shardingsphere-scaling/usage.cn.md | 2 +-
.../user-manual/shardingsphere-scaling/usage.en.md | 2 +-
.../ShowScalingJobStatusQueryResultSet.java | 4 +-
.../core/constant/DataPipelineConstants.java | 9 +++-
.../core/execute/FinishedCheckJobExecutor.java | 3 +-
.../pipeline/core/execute/PipelineJobExecutor.java | 4 +-
...DistributeLock.java => PipelineSimpleLock.java} | 48 +++++++++++++---------
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 3 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 4 +-
...dleRuleAlteredJobCompletionDetectAlgorithm.java | 26 ++++++++----
...uleAlteredJobCompletionDetectAlgorithmTest.java | 19 +++++----
11 files changed, 74 insertions(+), 50 deletions(-)
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 2643706..e9b9b7b 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -234,7 +234,7 @@ show scaling status {jobId};
```
mysql> show scaling status 660152090995195904;
+------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status | inventory_finished_percentage | incremental_idle_minutes |
+| item | data_source | status | inventory_finished_percentage | incremental_idle_seconds |
+------+-------------+----------+-------------------------------+--------------------------+
| 0 | ds_1 | FINISHED | 100 | 2834 |
| 1 | ds_0 | FINISHED | 100 | 2834 |
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 5d34402..e48d66a 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -236,7 +236,7 @@ Response:
```
mysql> show scaling status 660152090995195904;
+------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status | inventory_finished_percentage | incremental_idle_minutes |
+| item | data_source | status | inventory_finished_percentage | incremental_idle_seconds |
+------+-------------+----------+-------------------------------+--------------------------+
| 0 | ds_1 | FINISHED | 100 | 2834 |
| 1 | ds_0 | FINISHED | 100 | 2834 |
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
index 254e954..9528d01 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
@@ -53,7 +53,7 @@ public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSe
list.add(entry.getValue().isActive() ? "true" : "false");
list.add(entry.getValue().getInventoryFinishedPercentage());
long latestActiveTimeMillis = entry.getValue().getIncrementalLatestActiveTimeMillis();
- list.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0);
+ list.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
} else {
list.add("");
list.add("");
@@ -67,7 +67,7 @@ public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSe
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_minutes");
+ return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_seconds");
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
index 7933f9a..8a7e550 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
@@ -27,8 +27,13 @@ import lombok.NoArgsConstructor;
public final class DataPipelineConstants {
/**
+ * Data pipeline node name.
+ */
+ // TODO change to pipeline after job configuration structure completed
+ public static final String DATA_PIPELINE_NODE_NAME = "scaling";
+
+ /**
* Data pipeline root path.
*/
- // TODO change to /pipeline after job configuration structure completed
- public static final String DATA_PIPELINE_ROOT = "/scaling";
+ public static final String DATA_PIPELINE_ROOT = "/" + DATA_PIPELINE_NODE_NAME;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
index 2f287e9..5d5f6c3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
@@ -32,10 +32,11 @@ public final class FinishedCheckJobExecutor extends AbstractLifecycleExecutor {
private static final String JOB_NAME = "_finished_check";
- private static final String CRON_EXPRESSION = "0 * * * * ?";
+ private static final String CRON_EXPRESSION = "*/10 * * * * ?";
@Override
protected void doStart() {
+ // TODO refactor it and FinishedCheck after ejob support non-cron job
new ScheduleJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 8fa0e7d..e1f7b71 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -72,7 +72,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
case ADDED:
case UPDATED:
JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
- if (ScalingSchemaNameDistributeLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
+ if (PipelineSimpleLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
execute(jobConfigPOJO);
}
break;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
similarity index 60%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
index 932f62f..ac9422b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
@@ -18,38 +18,46 @@
package org.apache.shardingsphere.data.pipeline.core.lock;
import com.google.common.collect.Maps;
+import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Map;
+import java.util.Optional;
/**
- * Distributed locks added to the schema name during scaling.
+ * Pipeline simple lock.
*/
-public final class ScalingSchemaNameDistributeLock {
+// TODO extract interface and factory
+public final class PipelineSimpleLock {
- private static volatile ScalingSchemaNameDistributeLock instance;
+ private static volatile PipelineSimpleLock instance;
private final LockRegistryService lockRegistryService;
private final Map<String, Boolean> lockNameLockedMap;
- private ScalingSchemaNameDistributeLock() {
- ClusterPersistRepository repository = (ClusterPersistRepository) PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService().get().getRepository();
+ private PipelineSimpleLock() {
+ Optional<MetaDataPersistService> persistServiceOptional = PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService();
+ persistServiceOptional.orElseThrow(() -> new RuntimeException("Could not get metadata persist service"));
+ // TODO Use PersistRepository later
+ ClusterPersistRepository repository = (ClusterPersistRepository) persistServiceOptional.get().getRepository();
lockRegistryService = new LockRegistryService(repository);
lockNameLockedMap = Maps.newConcurrentMap();
}
/**
- * get ScalingSchemaNameDistributeLock instance.
- * @return ScalingSchemaNameDistributeLock
+ * Get instance.
+ *
+ * @return instance
*/
- public static ScalingSchemaNameDistributeLock getInstance() {
+ public static PipelineSimpleLock getInstance() {
if (null == instance) {
- synchronized (ScalingSchemaNameDistributeLock.class) {
+ synchronized (PipelineSimpleLock.class) {
if (null == instance) {
- instance = new ScalingSchemaNameDistributeLock();
+ instance = new PipelineSimpleLock();
}
}
}
@@ -57,21 +65,23 @@ public final class ScalingSchemaNameDistributeLock {
}
/**
- * Try to get lock.
+ * Try to lock.
+ *
* @param lockName lock name
- * @param timeoutMilliseconds the maximum time in milliseconds to acquire lock
- * @return true if get the lock, false if not
+ * @param timeoutMills the maximum time in milliseconds to acquire lock
+ * @return true if lock got, else false
*/
- public boolean tryLock(final String lockName, final long timeoutMilliseconds) {
- boolean locked = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMilliseconds);
- if (locked) {
+ public boolean tryLock(final String lockName, final long timeoutMills) {
+ boolean result = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMills);
+ if (result) {
lockNameLockedMap.put(lockName, true);
}
- return locked;
+ return result;
}
/**
* Release lock.
+ *
* @param lockName lock name
*/
public void releaseLock(final String lockName) {
@@ -81,7 +91,7 @@ public final class ScalingSchemaNameDistributeLock {
}
}
- private String decorateLockName(final String schemaName) {
- return "Scaling-" + schemaName;
+ private String decorateLockName(final String lockName) {
+ return DataPipelineConstants.DATA_PIPELINE_NODE_NAME + "-" + lockName;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 243fe5b..fdaa411 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -46,8 +46,7 @@ public final class RuleAlteredJobSchedulerCenter {
private static final GovernanceRepositoryAPI REGISTRY_REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
static {
- // TODO it's too slow to persist job progress
- JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 1, 1, TimeUnit.MINUTES);
+ JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 10, 10, TimeUnit.SECONDS);
}
/**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index b13fd21..4c83bfe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
-import org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -326,6 +326,6 @@ public final class RuleAlteredJobWorker {
*/
@Subscribe
public void scalingReleaseSchemaNameLock(final ScalingReleaseSchemaNameLockEvent event) {
- ScalingSchemaNameDistributeLock.getInstance().releaseLock(event.getSchemaName());
+ PipelineSimpleLock.getInstance().releaseLock(event.getSchemaName());
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index c6af445..d1edd90 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -35,12 +35,16 @@ import java.util.stream.Collectors;
*/
public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> {
- public static final String IDLE_THRESHOLD_KEY = "incremental-task-idle-minute-threshold";
+ public static final String IDLE_MINUTE_THRESHOLD_KEY = "incremental-task-idle-minute-threshold";
+
+ public static final String IDLE_SECOND_THRESHOLD_KEY = "incremental-task-idle-second-threshold";
+
+ public static final long DEFAULT_IDLE_SECOND_THRESHOLD = TimeUnit.MINUTES.toSeconds(30);
private Properties props = new Properties();
@Getter
- private long incrementalTaskIdleMinuteThreshold = 30;
+ private long incrementalTaskIdleSecondThreshold = DEFAULT_IDLE_SECOND_THRESHOLD;
@Override
public Properties getProps() {
@@ -54,9 +58,13 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
@Override
public void init() {
- Preconditions.checkArgument(props.containsKey(IDLE_THRESHOLD_KEY), "%s can not be null.", IDLE_THRESHOLD_KEY);
- incrementalTaskIdleMinuteThreshold = Long.parseLong(props.getProperty(IDLE_THRESHOLD_KEY));
- Preconditions.checkArgument(incrementalTaskIdleMinuteThreshold > 0, "%s value must be positive.", IDLE_THRESHOLD_KEY);
+ Preconditions.checkArgument(props.containsKey(IDLE_MINUTE_THRESHOLD_KEY) || props.containsKey(IDLE_SECOND_THRESHOLD_KEY), "incremental task idle threshold can not be null.");
+ if (props.containsKey(IDLE_SECOND_THRESHOLD_KEY)) {
+ incrementalTaskIdleSecondThreshold = Long.parseLong(props.getProperty(IDLE_SECOND_THRESHOLD_KEY));
+ } else {
+ incrementalTaskIdleSecondThreshold = TimeUnit.MINUTES.toSeconds(Long.parseLong(props.getProperty(IDLE_MINUTE_THRESHOLD_KEY)));
+ }
+ Preconditions.checkArgument(incrementalTaskIdleSecondThreshold > 0, "incremental task idle threshold must be positive.");
}
@Override
@@ -74,8 +82,8 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
if (!isAllInventoryTasksCompleted(jobProgresses)) {
return false;
}
- Collection<Long> incrementalTasksIdleMinutes = getIncrementalTasksIdleMinutes(jobProgresses);
- return incrementalTasksIdleMinutes.stream().allMatch(idleMinute -> idleMinute >= incrementalTaskIdleMinuteThreshold);
+ Collection<Long> incrementalTasksIdleSeconds = getIncrementalTasksIdleSeconds(jobProgresses);
+ return incrementalTasksIdleSeconds.stream().allMatch(each -> each >= incrementalTaskIdleSecondThreshold);
}
private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<JobProgress> jobProgresses) {
@@ -89,12 +97,12 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
.allMatch(each -> each.getPosition() instanceof FinishedPosition);
}
- private static Collection<Long> getIncrementalTasksIdleMinutes(final Collection<JobProgress> jobProgresses) {
+ private static Collection<Long> getIncrementalTasksIdleSeconds(final Collection<JobProgress> jobProgresses) {
long currentTimeMillis = System.currentTimeMillis();
return jobProgresses.stream().flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
.map(each -> {
long latestActiveTimeMillis = each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
- return latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0;
+ return latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
})
.collect(Collectors.toList());
}
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index ce34975..62a4e03 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -62,28 +62,28 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNoIdleThresholdKey() {
- when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
+ when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(false);
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailInvalidIdleThresholdKey() {
- when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
- when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
+ when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+ when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("@");
detectAlgorithm.init();
}
@Test(expected = IllegalArgumentException.class)
public void assertInitFailNegativeIdleThresholdKey() {
- when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
- when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
+ when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+ when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("-8");
detectAlgorithm.init();
}
@Test
public void assertInitSuccess() {
- when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
- when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
+ when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+ when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("4");
detectAlgorithm.init();
}
@@ -115,7 +115,7 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test
public void assertTrueWhenIdleMinutesNotReach() {
int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() - ThreadLocalRandom.current().nextLong(1, detectAlgorithm.getIncrementalTaskIdleMinuteThreshold());
+ long latestActiveTimeMillis = System.currentTimeMillis() - ThreadLocalRandom.current().nextLong(1, detectAlgorithm.getIncrementalTaskIdleSecondThreshold());
JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
Collection<JobProgress> jobProgresses = Collections.singleton(jobProgress);
RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);
@@ -138,7 +138,8 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
@Test
public void assertTrueWhenJobAlmostCompleted() {
int jobShardingCount = 1;
- long latestActiveTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleMinuteThreshold() + 5);
+ long latestActiveTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleSecondThreshold()
+ + IdleRuleAlteredJobCompletionDetectAlgorithm.DEFAULT_IDLE_SECOND_THRESHOLD);
JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
Collection<JobProgress> jobProgresses = Collections.singleton(jobProgress);
RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);