You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/02/18 03:52:13 UTC

[shardingsphere] branch master updated: Add second level threshold for IDLE completion detect algorithm (#15480)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4335a55  Add second level threshold for IDLE completion detect algorithm (#15480)
4335a55 is described below

commit 4335a55ef5e73dd011960caf3d9f82bf41ae7c78
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Fri Feb 18 11:51:11 2022 +0800

    Add second level threshold for IDLE completion detect algorithm (#15480)
    
    * Refactor PipelineSimpleLock
    
    * Increase FinishedCheckJob execution frequency
    
    * Add incremental-task-idle-second-threshold for IDLE completion detector
    
    * Change scaling status column name to incremental_idle_seconds
    
    * Increase job progress persist frequency
---
 .../user-manual/shardingsphere-scaling/usage.cn.md |  2 +-
 .../user-manual/shardingsphere-scaling/usage.en.md |  2 +-
 .../ShowScalingJobStatusQueryResultSet.java        |  4 +-
 .../core/constant/DataPipelineConstants.java       |  9 +++-
 .../core/execute/FinishedCheckJobExecutor.java     |  3 +-
 .../pipeline/core/execute/PipelineJobExecutor.java |  4 +-
 ...DistributeLock.java => PipelineSimpleLock.java} | 48 +++++++++++++---------
 .../rulealtered/RuleAlteredJobSchedulerCenter.java |  3 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |  4 +-
 ...dleRuleAlteredJobCompletionDetectAlgorithm.java | 26 ++++++++----
 ...uleAlteredJobCompletionDetectAlgorithmTest.java | 19 +++++----
 11 files changed, 74 insertions(+), 50 deletions(-)

diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index 2643706..e9b9b7b 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -234,7 +234,7 @@ show scaling status {jobId};
 ```
 mysql> show scaling status 660152090995195904;
 +------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status   | inventory_finished_percentage | incremental_idle_minutes |
+| item | data_source | status   | inventory_finished_percentage | incremental_idle_seconds |
 +------+-------------+----------+-------------------------------+--------------------------+
 | 0    | ds_1        | FINISHED | 100                           | 2834                     |
 | 1    | ds_0        | FINISHED | 100                           | 2834                     |
diff --git a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 5d34402..e48d66a 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -236,7 +236,7 @@ Response:
 ```
 mysql> show scaling status 660152090995195904;
 +------+-------------+----------+-------------------------------+--------------------------+
-| item | data_source | status   | inventory_finished_percentage | incremental_idle_minutes |
+| item | data_source | status   | inventory_finished_percentage | incremental_idle_seconds |
 +------+-------------+----------+-------------------------------+--------------------------+
 | 0    | ds_1        | FINISHED | 100                           | 2834                     |
 | 1    | ds_0        | FINISHED | 100                           | 2834                     |
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
index 254e954..9528d01 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/ShowScalingJobStatusQueryResultSet.java
@@ -53,7 +53,7 @@ public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSe
                         list.add(entry.getValue().isActive() ? "true" : "false");
                         list.add(entry.getValue().getInventoryFinishedPercentage());
                         long latestActiveTimeMillis = entry.getValue().getIncrementalLatestActiveTimeMillis();
-                        list.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0);
+                        list.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
                     } else {
                         list.add("");
                         list.add("");
@@ -67,7 +67,7 @@ public final class ShowScalingJobStatusQueryResultSet implements DistSQLResultSe
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_minutes");
+        return Arrays.asList("item", "data_source", "status", "active", "inventory_finished_percentage", "incremental_idle_seconds");
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
index 7933f9a..8a7e550 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/constant/DataPipelineConstants.java
@@ -27,8 +27,13 @@ import lombok.NoArgsConstructor;
 public final class DataPipelineConstants {
     
     /**
+     * Data pipeline node name.
+     */
+    // TODO change to pipeline after job configuration structure completed
+    public static final String DATA_PIPELINE_NODE_NAME = "scaling";
+    
+    /**
      * Data pipeline root path.
      */
-    // TODO change to /pipeline after job configuration structure completed
-    public static final String DATA_PIPELINE_ROOT = "/scaling";
+    public static final String DATA_PIPELINE_ROOT = "/" + DATA_PIPELINE_NODE_NAME;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
index 2f287e9..5d5f6c3 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/FinishedCheckJobExecutor.java
@@ -32,10 +32,11 @@ public final class FinishedCheckJobExecutor extends AbstractLifecycleExecutor {
     
     private static final String JOB_NAME = "_finished_check";
     
-    private static final String CRON_EXPRESSION = "0 * * * * ?";
+    private static final String CRON_EXPRESSION = "*/10 * * * * ?";
     
     @Override
     protected void doStart() {
+        // TODO refactor it and FinishedCheck after ejob support non-cron job
         new ScheduleJobBootstrap(PipelineAPIFactory.getRegistryCenter(), new FinishedCheckJob(), createJobConfig()).schedule();
     }
     
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 8fa0e7d..e1f7b71 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.JobConfigu
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -72,7 +72,7 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
                 case ADDED:
                 case UPDATED:
                     JobConfiguration jobConfig = YamlEngine.unmarshal(jobConfigPOJO.getJobParameter(), JobConfiguration.class, true);
-                    if (ScalingSchemaNameDistributeLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
+                    if (PipelineSimpleLock.getInstance().tryLock(jobConfig.getWorkflowConfig().getSchemaName(), 1000)) {
                         execute(jobConfigPOJO);
                     }
                     break;
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
similarity index 60%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
index 932f62f..ac9422b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/ScalingSchemaNameDistributeLock.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/lock/PipelineSimpleLock.java
@@ -18,38 +18,46 @@
 package org.apache.shardingsphere.data.pipeline.core.lock;
 
 import com.google.common.collect.Maps;
+import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.service.LockRegistryService;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
 import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
- * Distributed locks added to the schema name during scaling.
+ * Pipeline simple lock.
  */
-public final class ScalingSchemaNameDistributeLock {
+// TODO extract interface and factory
+public final class PipelineSimpleLock {
     
-    private static volatile ScalingSchemaNameDistributeLock instance;
+    private static volatile PipelineSimpleLock instance;
     
     private final LockRegistryService lockRegistryService;
     
     private final Map<String, Boolean> lockNameLockedMap;
     
-    private ScalingSchemaNameDistributeLock() {
-        ClusterPersistRepository repository = (ClusterPersistRepository) PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService().get().getRepository();
+    private PipelineSimpleLock() {
+        Optional<MetaDataPersistService> persistServiceOptional = PipelineContext.getContextManager().getMetaDataContexts().getMetaDataPersistService();
+        persistServiceOptional.orElseThrow(() -> new RuntimeException("Could not get metadata persist service"));
+        // TODO Use PersistRepository later
+        ClusterPersistRepository repository = (ClusterPersistRepository) persistServiceOptional.get().getRepository();
         lockRegistryService = new LockRegistryService(repository);
         lockNameLockedMap = Maps.newConcurrentMap();
     }
     
     /**
-     * get ScalingSchemaNameDistributeLock instance.
-     * @return ScalingSchemaNameDistributeLock
+     * Get instance.
+     *
+     * @return instance
      */
-    public static ScalingSchemaNameDistributeLock getInstance() {
+    public static PipelineSimpleLock getInstance() {
         if (null == instance) {
-            synchronized (ScalingSchemaNameDistributeLock.class) {
+            synchronized (PipelineSimpleLock.class) {
                 if (null == instance) {
-                    instance = new ScalingSchemaNameDistributeLock();
+                    instance = new PipelineSimpleLock();
                 }
             }
         }
@@ -57,21 +65,23 @@ public final class ScalingSchemaNameDistributeLock {
     }
     
     /**
-     * Try to get lock.
+     * Try to lock.
+     *
      * @param lockName lock name
-     * @param timeoutMilliseconds the maximum time in milliseconds to acquire lock
-     * @return true if get the lock, false if not
+     * @param timeoutMills the maximum time in milliseconds to acquire lock
+     * @return true if lock got, else false
      */
-    public boolean tryLock(final String lockName, final long timeoutMilliseconds) {
-        boolean locked = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMilliseconds);
-        if (locked) {
+    public boolean tryLock(final String lockName, final long timeoutMills) {
+        boolean result = lockRegistryService.tryLock(decorateLockName(lockName), timeoutMills);
+        if (result) {
             lockNameLockedMap.put(lockName, true);
         }
-        return locked;
+        return result;
     }
     
     /**
      * Release lock.
+     *
      * @param lockName lock name
      */
     public void releaseLock(final String lockName) {
@@ -81,7 +91,7 @@ public final class ScalingSchemaNameDistributeLock {
         }
     }
     
-    private String decorateLockName(final String schemaName) {
-        return "Scaling-" + schemaName;
+    private String decorateLockName(final String lockName) {
+        return DataPipelineConstants.DATA_PIPELINE_NODE_NAME + "-" + lockName;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 243fe5b..fdaa411 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -46,8 +46,7 @@ public final class RuleAlteredJobSchedulerCenter {
     private static final GovernanceRepositoryAPI REGISTRY_REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
     
     static {
-        // TODO it's too slow to persist job progress
-        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 1, 1, TimeUnit.MINUTES);
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 10, 10, TimeUnit.SECONDS);
     }
     
     /**
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index b13fd21..4c83bfe 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -35,7 +35,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
-import org.apache.shardingsphere.data.pipeline.core.lock.ScalingSchemaNameDistributeLock;
+import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
@@ -326,6 +326,6 @@ public final class RuleAlteredJobWorker {
      */
     @Subscribe
     public void scalingReleaseSchemaNameLock(final ScalingReleaseSchemaNameLockEvent event) {
-        ScalingSchemaNameDistributeLock.getInstance().releaseLock(event.getSchemaName());
+        PipelineSimpleLock.getInstance().releaseLock(event.getSchemaName());
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
index c6af445..d1edd90 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/IdleRuleAlteredJobCompletionDetectAlgorithm.java
@@ -35,12 +35,16 @@ import java.util.stream.Collectors;
  */
 public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> {
     
-    public static final String IDLE_THRESHOLD_KEY = "incremental-task-idle-minute-threshold";
+    public static final String IDLE_MINUTE_THRESHOLD_KEY = "incremental-task-idle-minute-threshold";
+    
+    public static final String IDLE_SECOND_THRESHOLD_KEY = "incremental-task-idle-second-threshold";
+    
+    public static final long DEFAULT_IDLE_SECOND_THRESHOLD = TimeUnit.MINUTES.toSeconds(30);
     
     private Properties props = new Properties();
     
     @Getter
-    private long incrementalTaskIdleMinuteThreshold = 30;
+    private long incrementalTaskIdleSecondThreshold = DEFAULT_IDLE_SECOND_THRESHOLD;
     
     @Override
     public Properties getProps() {
@@ -54,9 +58,13 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
     
     @Override
     public void init() {
-        Preconditions.checkArgument(props.containsKey(IDLE_THRESHOLD_KEY), "%s can not be null.", IDLE_THRESHOLD_KEY);
-        incrementalTaskIdleMinuteThreshold = Long.parseLong(props.getProperty(IDLE_THRESHOLD_KEY));
-        Preconditions.checkArgument(incrementalTaskIdleMinuteThreshold > 0, "%s value must be positive.", IDLE_THRESHOLD_KEY);
+        Preconditions.checkArgument(props.containsKey(IDLE_MINUTE_THRESHOLD_KEY) || props.containsKey(IDLE_SECOND_THRESHOLD_KEY), "incremental task idle threshold can not be null.");
+        if (props.containsKey(IDLE_SECOND_THRESHOLD_KEY)) {
+            incrementalTaskIdleSecondThreshold = Long.parseLong(props.getProperty(IDLE_SECOND_THRESHOLD_KEY));
+        } else {
+            incrementalTaskIdleSecondThreshold = TimeUnit.MINUTES.toSeconds(Long.parseLong(props.getProperty(IDLE_MINUTE_THRESHOLD_KEY)));
+        }
+        Preconditions.checkArgument(incrementalTaskIdleSecondThreshold > 0, "incremental task idle threshold must be positive.");
     }
     
     @Override
@@ -74,8 +82,8 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
         if (!isAllInventoryTasksCompleted(jobProgresses)) {
             return false;
         }
-        Collection<Long> incrementalTasksIdleMinutes = getIncrementalTasksIdleMinutes(jobProgresses);
-        return incrementalTasksIdleMinutes.stream().allMatch(idleMinute -> idleMinute >= incrementalTaskIdleMinuteThreshold);
+        Collection<Long> incrementalTasksIdleSeconds = getIncrementalTasksIdleSeconds(jobProgresses);
+        return incrementalTasksIdleSeconds.stream().allMatch(each -> each >= incrementalTaskIdleSecondThreshold);
     }
     
     private static boolean isAllProgressesFilled(final int jobShardingCount, final Collection<JobProgress> jobProgresses) {
@@ -89,12 +97,12 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithm implements JobCom
                 .allMatch(each -> each.getPosition() instanceof FinishedPosition);
     }
     
-    private static Collection<Long> getIncrementalTasksIdleMinutes(final Collection<JobProgress> jobProgresses) {
+    private static Collection<Long> getIncrementalTasksIdleSeconds(final Collection<JobProgress> jobProgresses) {
         long currentTimeMillis = System.currentTimeMillis();
         return jobProgresses.stream().flatMap(each -> each.getIncrementalTaskProgressMap().values().stream())
                 .map(each -> {
                     long latestActiveTimeMillis = each.getIncrementalTaskDelay().getLatestActiveTimeMillis();
-                    return latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toMinutes(currentTimeMillis - latestActiveTimeMillis) : 0;
+                    return latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0;
                 })
                 .collect(Collectors.toList());
     }
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
index ce34975..62a4e03 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/IdleRuleAlteredJobCompletionDetectAlgorithmTest.java
@@ -62,28 +62,28 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailNoIdleThresholdKey() {
-        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(false);
+        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(false);
         detectAlgorithm.init();
     }
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailInvalidIdleThresholdKey() {
-        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("@");
+        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("@");
         detectAlgorithm.init();
     }
     
     @Test(expected = IllegalArgumentException.class)
     public void assertInitFailNegativeIdleThresholdKey() {
-        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("-8");
+        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("-8");
         detectAlgorithm.init();
     }
     
     @Test
     public void assertInitSuccess() {
-        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn(true);
-        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_THRESHOLD_KEY)).thenReturn("4");
+        when(propsMock.containsKey(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn(true);
+        when(propsMock.getProperty(IdleRuleAlteredJobCompletionDetectAlgorithm.IDLE_MINUTE_THRESHOLD_KEY)).thenReturn("4");
         detectAlgorithm.init();
     }
     
@@ -115,7 +115,7 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     @Test
     public void assertTrueWhenIdleMinutesNotReach() {
         int jobShardingCount = 1;
-        long latestActiveTimeMillis = System.currentTimeMillis() - ThreadLocalRandom.current().nextLong(1, detectAlgorithm.getIncrementalTaskIdleMinuteThreshold());
+        long latestActiveTimeMillis = System.currentTimeMillis() - ThreadLocalRandom.current().nextLong(1, detectAlgorithm.getIncrementalTaskIdleSecondThreshold());
         JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
         Collection<JobProgress> jobProgresses = Collections.singleton(jobProgress);
         RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);
@@ -138,7 +138,8 @@ public final class IdleRuleAlteredJobCompletionDetectAlgorithmTest {
     @Test
     public void assertTrueWhenJobAlmostCompleted() {
         int jobShardingCount = 1;
-        long latestActiveTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleMinuteThreshold() + 5);
+        long latestActiveTimeMillis = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(detectAlgorithm.getIncrementalTaskIdleSecondThreshold()
+                + IdleRuleAlteredJobCompletionDetectAlgorithm.DEFAULT_IDLE_SECOND_THRESHOLD);
         JobProgress jobProgress = createJobProgress(latestActiveTimeMillis);
         Collection<JobProgress> jobProgresses = Collections.singleton(jobProgress);
         RuleAlteredJobAlmostCompletedParameter parameter = new RuleAlteredJobAlmostCompletedParameter(jobShardingCount, jobProgresses);