You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2021/01/16 16:04:40 UTC

[shardingsphere] branch master updated: Move governance-core/scaling into scaling-core (#9055)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5be9b1a  Move governance-core/scaling into scaling-core (#9055)
5be9b1a is described below

commit 5be9b1a8e5ed8aac9aa15c94bde987d16902b93d
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Sun Jan 17 00:04:10 2021 +0800

    Move governance-core/scaling into scaling-core (#9055)
    
    * move governance-core/scaling into scaling-core
    
    Co-authored-by: qiulu3 <Lucas209910>
---
 .../shardingsphere-governance-core/pom.xml         |  5 --
 .../model/rule/RuleConfigurationsAlteredEvent.java | 14 ++--
 .../scaling/callback/ScalingResultCallback.java    | 62 ----------------
 .../impl/GovernanceBootstrapInitializer.java       |  5 +-
 .../scaling/ScalingWorkerBootstrap.java            |  9 +--
 .../shardingsphere-scaling-core/pom.xml            |  5 ++
 .../scaling/core/config/JobConfiguration.java      |  2 +-
 .../WorkflowConfiguration.java}                    | 30 ++++----
 .../scaling/core/constant/ScalingConstant.java     | 10 +++
 .../core/service/AbstractScalingJobService.java    | 69 +++---------------
 .../scaling/core/service/ScalingJobService.java    |  9 +--
 .../scaling/core/spi/ScalingWorker.java            |  7 --
 .../scaling/core/spi/ScalingWorkerLoader.java      |  8 +--
 .../scaling/core/utils/ScalingTaskUtil.java        |  2 +-
 .../core/workflow}/ScalingServiceHolder.java       | 57 ++++++---------
 .../impl/DistributedScalingJobServiceTest.java     | 22 ++----
 .../scaling/core/spi/ScalingWorkerLoaderTest.java  | 52 --------------
 ...e.shardingsphere.scaling.core.spi.ScalingWorker | 18 -----
 .../elasticjob/ElasticJobScalingWorker.java        |  7 +-
 .../scaling/elasticjob/FinishedCheckWorker.java}   | 26 +++++--
 .../scaling/elasticjob/job/FinishedCheckJob.java   | 84 ++++++++++++++++++++++
 ...e.shardingsphere.scaling.core.spi.ScalingWorker |  1 +
 22 files changed, 190 insertions(+), 314 deletions(-)

diff --git a/shardingsphere-governance/shardingsphere-governance-core/pom.xml b/shardingsphere-governance/shardingsphere-governance-core/pom.xml
index f59c449..465500e 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/pom.xml
+++ b/shardingsphere-governance/shardingsphere-governance-core/pom.xml
@@ -58,11 +58,6 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-scaling-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-dbcp2</artifactId>
         </dependency>
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
index abc668b..95ac061 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
+++ b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/event/model/rule/RuleConfigurationsAlteredEvent.java
@@ -29,11 +29,17 @@ public final class RuleConfigurationsAlteredEvent {
     
     private final String schemaName;
     
-    private final String yamlDataSourceContent;
+    private final String sourceDataSource;
     
-    private final String yamlRuleConfigurationsContent;
+    private final String sourceRule;
     
-    private final String cachedYamlRuleConfigurationsContent;
+    private final String targetDataSource;
     
-    private final String ruleConfigurationCacheId;
+    private final String targetRule;
+    
+    private final String ruleCacheId;
+    
+    public RuleConfigurationsAlteredEvent(final String schemaName, final String sourceDataSource, final String sourceRule, final String targetRule, final String ruleCacheId) {
+        this(schemaName, sourceDataSource, sourceRule, sourceDataSource, targetRule, ruleCacheId);
+    }
 }
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
deleted file mode 100644
index 6669979..0000000
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/callback/ScalingResultCallback.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.governance.core.scaling.callback;
-
-import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.LockContext;
-import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Scaling result callback.
- */
-public final class ScalingResultCallback implements ScalingCallback {
-    
-    private final String schemaName;
-    
-    private final String ruleConfigurationCacheId;
-    
-    public ScalingResultCallback(final String schemaName, final String ruleConfigurationCacheId) {
-        this.schemaName = schemaName;
-        this.ruleConfigurationCacheId = ruleConfigurationCacheId;
-    }
-    
-    @Override
-    public void onSuccess(final long jobId) {
-        if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) && LockContext.getLockStrategy().checkLock()) {
-            try {
-                Thread.sleep(30000L);
-                if (ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
-                    ScalingServiceHolder.getInstance().stopScalingJob(jobId);
-                    ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(schemaName, ruleConfigurationCacheId));
-                }  
-            } catch (final InterruptedException ignored) {
-            } finally {
-                LockContext.getLockStrategy().releaseLock();
-            }
-        }
-    }
-    
-    @Override
-    public void onFailure(final long jobId) {
-        // TODO
-    }
-}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
index 6e46151..b7410ab 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/impl/GovernanceBootstrapInitializer.java
@@ -21,7 +21,6 @@ import org.apache.shardingsphere.governance.context.metadata.GovernanceMetaDataC
 import org.apache.shardingsphere.governance.context.transaction.GovernanceTransactionContexts;
 import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
 import org.apache.shardingsphere.governance.core.lock.strategy.GovernanceLockStrategy;
-import org.apache.shardingsphere.governance.core.scaling.ScalingServiceHolder;
 import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceConfigurationYamlSwapper;
 import org.apache.shardingsphere.infra.auth.builtin.DefaultAuthentication;
 import org.apache.shardingsphere.infra.auth.builtin.yaml.config.YamlAuthenticationConfiguration;
@@ -41,8 +40,8 @@ import org.apache.shardingsphere.proxy.config.yaml.YamlProxyRuleConfiguration;
 import org.apache.shardingsphere.proxy.config.yaml.YamlProxyServerConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.service.impl.DistributedScalingJobService;
 import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
+import org.apache.shardingsphere.scaling.core.workflow.ScalingServiceHolder;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
 
 import java.util.Collection;
@@ -145,7 +144,7 @@ public final class GovernanceBootstrapInitializer extends AbstractBootstrapIniti
             serverConfiguration.setDistributedScalingService(new GovernanceConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getGovernance()));
             ScalingContext.getInstance().init(serverConfiguration);
             ScalingWorkerLoader.initScalingWorker();
-            ScalingServiceHolder.getInstance().init(new DistributedScalingJobService());
+            ScalingServiceHolder.getInstance().init();
         }
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
index 17571c4..3f98bc0 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/java/org/apache/shardingsphere/scaling/ScalingWorkerBootstrap.java
@@ -23,8 +23,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.scaling.core.spi.ScalingWorkerLoader;
 import org.apache.shardingsphere.scaling.util.ServerConfigurationInitializer;
 
-import java.util.Optional;
-
 
 /**
  * Bootstrap of ShardingSphere-Scaling worker.
@@ -43,12 +41,7 @@ public final class ScalingWorkerBootstrap {
         // CHECKSTYLE:ON
         log.info("ShardingSphere-Scaling Worker Startup");
         ServerConfigurationInitializer.init();
-        Optional<String> type = ScalingWorkerLoader.initScalingWorker();
-        if (!type.isPresent()) {
-            log.error("None worker found.");
-            return;
-        }
-        log.info("Worker type: {}", type.get());
+        ScalingWorkerLoader.initScalingWorker();
         wait0();
     }
     
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index c18b682..f0a68ad 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -41,6 +41,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-governance-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-governance-repository-api</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index f8f8158..e685d50 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -41,5 +41,5 @@ public final class JobConfiguration {
     
     private boolean running = true;
     
-    private long allowDelay = 60 * 1000L;
+    private WorkflowConfiguration workflowConfig;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
similarity index 69%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
index ee94de0..47f1e5d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingCallback.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/WorkflowConfiguration.java
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.service;
+package org.apache.shardingsphere.scaling.core.config;
 
-/**
- * Scaling callback.
- */
-public interface ScalingCallback {
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@RequiredArgsConstructor
+public final class WorkflowConfiguration {
+    
+    private long allowDelay = 60 * 1000L;
     
-    /**
-     * Callback when execute success.
-     *
-     * @param jobId job id
-     */
-    void onSuccess(long jobId);
+    private final String schemaName;
     
-    /**
-     * Callback when execute failure.
-     *
-     * @param jobId job id
-     */
-    void onFailure(long jobId);
+    private final String ruleCacheId;
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
index c677fd4..3c0ef9d 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/constant/ScalingConstant.java
@@ -62,11 +62,21 @@ public final class ScalingConstant {
     public static final String CONFIG = "config";
     
     /**
+     * Status.
+     */
+    public static final String STATUS = "status";
+    
+    /**
      * Position.
      */
     public static final String POSITION = "position";
     
     /**
+     * Workflow.
+     */
+    public static final String WORKFLOW = "workflow";
+    
+    /**
      * Incremental.
      */
     public static final String INCREMENTAL = "incremental";
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
index 40f5644..162bf64 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/AbstractScalingJobService.java
@@ -17,27 +17,22 @@
 
 package org.apache.shardingsphere.scaling.core.service;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.RuleConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
 import org.apache.shardingsphere.scaling.core.config.datasource.ShardingSphereJDBCDataSourceConfiguration;
-import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckerFactory;
 import org.apache.shardingsphere.scaling.core.job.environmental.ScalingEnvironmentalManager;
-import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
 
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract scaling job service.
@@ -45,29 +40,15 @@ import java.util.concurrent.TimeUnit;
 @Slf4j
 public abstract class AbstractScalingJobService implements ScalingJobService {
     
-    private static final ScheduledExecutorService FINISH_CHECK_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("Scaling-finish-check-%d"));
-    
     @Override
-    public Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule, final ScalingCallback scalingCallback) {
-        log.info("start scaling job...");
-        log.info("sourceDataSource = {}", sourceDataSource);
-        log.info("sourceRule = {}", sourceRule);
-        log.info("targetDataSource = {}", targetDataSource);
-        log.info("targetRule = {}", targetRule);
-        Optional<ScalingJob> result = start(sourceDataSource, sourceRule, targetDataSource, targetRule);
-        if (!result.isPresent()) {
-            return result;
-        }
-        FINISH_CHECK_EXECUTOR.scheduleWithFixedDelay(new JobFinishChecker(result.get(), scalingCallback), 1, 1, TimeUnit.MINUTES);
-        return result;
-    }
-    
-    private Optional<ScalingJob> start(final String sourceDataSource, final String sourceRule, final String targetDataSource, final String targetRule) {
+    public Optional<ScalingJob> start(final RuleConfigurationsAlteredEvent event) {
         ScalingConfiguration scalingConfig = new ScalingConfiguration();
         scalingConfig.setRuleConfiguration(new RuleConfiguration(
-                new ShardingSphereJDBCDataSourceConfiguration(sourceDataSource, sourceRule),
-                new ShardingSphereJDBCDataSourceConfiguration(targetDataSource, targetRule)));
-        scalingConfig.setJobConfiguration(new JobConfiguration());
+                new ShardingSphereJDBCDataSourceConfiguration(event.getSourceDataSource(), event.getSourceRule()),
+                new ShardingSphereJDBCDataSourceConfiguration(event.getTargetDataSource(), event.getTargetRule())));
+        JobConfiguration jobConfig = new JobConfiguration();
+        jobConfig.setWorkflowConfig(new WorkflowConfiguration(event.getSchemaName(), event.getRuleCacheId()));
+        scalingConfig.setJobConfiguration(jobConfig);
         return start(scalingConfig);
     }
     
@@ -88,38 +69,4 @@ public abstract class AbstractScalingJobService implements ScalingJobService {
     public void reset(final long jobId) throws SQLException {
         new ScalingEnvironmentalManager().resetTargetTable(getJob(jobId));
     }
-    
-    @RequiredArgsConstructor
-    private class JobFinishChecker implements Runnable {
-        
-        private final ScalingJob scalingJob;
-        
-        private final ScalingCallback scalingCallback;
-        
-        private boolean executed;
-        
-        @Override
-        public void run() {
-            if (executed) {
-                return;
-            }
-            long jobId = scalingJob.getJobId();
-            try {
-                JobProgress jobProgress = getProgress(jobId);
-                if (jobProgress.getStatus().contains("FAILURE")) {
-                    log.warn("scaling job {} failure.", jobId);
-                    executed = true;
-                    scalingCallback.onFailure(jobId);
-                } else if (ScalingTaskUtil.allTasksAlmostFinished(jobProgress, scalingJob.getScalingConfig().getJobConfiguration())) {
-                    log.info("scaling job {} almost finished.", jobId);
-                    executed = true;
-                    scalingCallback.onSuccess(jobId);
-                }
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                log.error("scaling job {} finish check failed!", jobId, ex);
-            }
-        }
-    }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
index a488f46..89bb13a 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/service/ScalingJobService.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.scaling.core.service;
 
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
 import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
@@ -50,14 +51,10 @@ public interface ScalingJobService {
     /**
      * Start scaling job.
      *
-     * @param sourceDataSource source data source
-     * @param sourceRule source rule
-     * @param targetDataSource target data source
-     * @param targetRule target rule
-     * @param scalingCallback scaling callback
+     * @param event rule configurations altered event
      * @return scaling job
      */
-    Optional<ScalingJob> start(String sourceDataSource, String sourceRule, String targetDataSource, String targetRule, ScalingCallback scalingCallback);
+    Optional<ScalingJob> start(RuleConfigurationsAlteredEvent event);
     
     /**
      * Stop job.
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
index 0676f73..9c2fe00 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorker.java
@@ -25,13 +25,6 @@ import org.apache.shardingsphere.governance.repository.api.config.GovernanceConf
 public interface ScalingWorker {
     
     /**
-     * Get type.
-     *
-     * @return type of scaling worker.
-     */
-    String getType();
-    
-    /**
      * Init scaling worker.
      *
      * @param governanceConfig governance configuration
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
index a59f08a..005c09e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoader.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
 
 import java.util.Collection;
-import java.util.Optional;
 
 /**
  * Scaling worker loader.
@@ -33,10 +32,8 @@ public final class ScalingWorkerLoader {
     
     /**
      * Init scaling worker.
-     *
-     * @return worker type
      */
-    public static Optional<String> initScalingWorker() {
+    public static void initScalingWorker() {
         log.info("Init scaling worker");
         ShardingSphereServiceLoader.register(ScalingWorker.class);
         GovernanceConfiguration governanceConfig = ScalingContext.getInstance().getServerConfig().getDistributedScalingService();
@@ -44,10 +41,7 @@ public final class ScalingWorkerLoader {
             Collection<ScalingWorker> scalingWorkers = ShardingSphereServiceLoader.newServiceInstances(ScalingWorker.class);
             for (ScalingWorker each : scalingWorkers) {
                 each.init(governanceConfig);
-                return Optional.of(each.getType());
             }
-            log.error("None worker found.");
         }
-        return Optional.empty();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
index 778fd13..b15dec6 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/ScalingTaskUtil.java
@@ -50,7 +50,7 @@ public final class ScalingTaskUtil {
      */
     public static boolean allTasksAlmostFinished(final JobProgress jobProgress, final JobConfiguration jobConfig) {
         return jobProgress.getInventoryTaskProgress().stream().allMatch(each -> each.getTotal() == each.getFinished())
-                && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getAllowDelay());
+                && jobProgress.getIncrementalTaskProgress().stream().allMatch(each -> each.getDelayMillisecond() < jobConfig.getWorkflowConfig().getAllowDelay());
     }
     
     /**
diff --git a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
similarity index 62%
rename from shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
rename to shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
index 1a31236..eb32895 100644
--- a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/scaling/ScalingServiceHolder.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/workflow/ScalingServiceHolder.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.governance.core.scaling;
+package org.apache.shardingsphere.scaling.core.workflow;
 
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
 import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
-import org.apache.shardingsphere.governance.core.scaling.callback.ScalingResultCallback;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
+import org.apache.shardingsphere.scaling.core.service.ScalingJobServiceFactory;
 
 import java.util.Map;
 import java.util.Optional;
@@ -38,15 +38,11 @@ public final class ScalingServiceHolder {
     
     private static final ScalingServiceHolder INSTANCE = new ScalingServiceHolder();
     
-    private volatile ScalingJobService scalingJobService;
-    
-    private ScalingServiceHolder() {
-        ShardingSphereEventBus.getInstance().register(this);
-    }
+    private final ScalingJobService scalingJobService = ScalingJobServiceFactory.getInstance();
     
     /**
      * Get scaling service holder instance.
-     * 
+     *
      * @return scaling service holder instance
      */
     public static ScalingServiceHolder getInstance() {
@@ -54,57 +50,50 @@ public final class ScalingServiceHolder {
     }
     
     /**
-     * Init scaling service.
-     * 
-     * @param scalingJobService scaling job service
+     * Init.
      */
-    public void init(final ScalingJobService scalingJobService) {
-        this.scalingJobService = scalingJobService;
+    public void init() {
+        ShardingSphereEventBus.getInstance().register(this);
     }
     
     /**
      * Start scaling job.
-     * 
+     *
      * @param event rule configurations altered event.
      */
     @Subscribe
     public void startScalingJob(final RuleConfigurationsAlteredEvent event) {
-        Optional<ScalingJob> scalingJob = scalingJobService.start(event.getYamlDataSourceContent(), 
-                event.getYamlRuleConfigurationsContent(), event.getYamlDataSourceContent(), 
-                event.getCachedYamlRuleConfigurationsContent(), new ScalingResultCallback(event.getSchemaName(), event.getRuleConfigurationCacheId()));
+        Optional<ScalingJob> scalingJob = scalingJobService.start(event);
         if (!scalingJob.isPresent()) {
-            ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleConfigurationCacheId()));
+            ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(event.getSchemaName(), event.getRuleCacheId()));
         }
     }
     
     /**
      * Check scaling result.
-     * 
+     *
      * @param jobId job Id
      * @return true if scaling result check successfully, else false
      */
     public boolean checkScalingResult(final long jobId) {
-        return checkScalingResult(jobId, scalingJobService.check(jobId));
-    }
-    
-    private boolean checkScalingResult(final long jobId, final Map<String, DataConsistencyCheckResult> scalingResult) {
-        if (!scalingResult.isEmpty()) {
-            for (String key : scalingResult.keySet()) {
-                boolean isDataValid = scalingResult.get(key).isDataValid(); 
-                boolean isCountValid = scalingResult.get(key).isCountValid();
-                if (!isDataValid || !isCountValid) {
-                    log.error("Scaling job: {}, table: {} data consistency check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid, isCountValid);
-                    return false;
-                }
+        Map<String, DataConsistencyCheckResult> scalingResult = scalingJobService.check(jobId);
+        if (scalingResult.isEmpty()) {
+            return false;
+        }
+        for (String key : scalingResult.keySet()) {
+            boolean isDataValid = scalingResult.get(key).isDataValid();
+            boolean isCountValid = scalingResult.get(key).isCountValid();
+            if (!isDataValid || !isCountValid) {
+                log.error("Scaling job: {}, table: {} data consistency check failed, dataValid: {}, countValid: {}", jobId, key, isDataValid, isCountValid);
+                return false;
             }
-            return true;
         }
-        return false;
+        return true;
     }
     
     /**
      * Stop scaling job.
-     * 
+     *
      * @param jobId job id
      */
     public void stopScalingJob(final long jobId) {
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
index 7184581..f4606d7 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/service/impl/DistributedScalingJobServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.scaling.core.service.impl;
 
 import com.google.gson.Gson;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsAlteredEvent;
 import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
 import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
 import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
@@ -31,7 +32,6 @@ import org.apache.shardingsphere.scaling.core.exception.ScalingJobNotFoundExcept
 import org.apache.shardingsphere.scaling.core.job.JobProgress;
 import org.apache.shardingsphere.scaling.core.job.ScalingJob;
 import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
-import org.apache.shardingsphere.scaling.core.service.ScalingCallback;
 import org.apache.shardingsphere.scaling.core.service.ScalingJobService;
 import org.apache.shardingsphere.scaling.core.util.ScalingConfigurationUtil;
 import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
@@ -81,7 +81,8 @@ public final class DistributedScalingJobServiceTest {
     public void assertStartWithCallbackImmediately() {
         ScalingConfiguration scalingConfig = mockScalingConfiguration();
         ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
-        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), source.getDataSource(), source.getRule(), mockScalingCallback());
+        RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent("schema", source.getDataSource(), source.getRule(), source.getRule(), "cacheId");
+        Optional<ScalingJob> scalingJob = scalingJobService.start(event);
         assertFalse(scalingJob.isPresent());
     }
     
@@ -90,7 +91,9 @@ public final class DistributedScalingJobServiceTest {
         ScalingConfiguration scalingConfig = ScalingConfigurationUtil.initConfig("/config_sharding_sphere_jdbc_target.json");
         ShardingSphereJDBCDataSourceConfiguration source = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getSource().unwrap();
         ShardingSphereJDBCDataSourceConfiguration target = (ShardingSphereJDBCDataSourceConfiguration) scalingConfig.getRuleConfiguration().getTarget().unwrap();
-        Optional<ScalingJob> scalingJob = scalingJobService.start(source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), mockScalingCallback());
+        RuleConfigurationsAlteredEvent event = new RuleConfigurationsAlteredEvent(
+                "schema", source.getDataSource(), source.getRule(), target.getDataSource(), target.getRule(), "cacheId");
+        Optional<ScalingJob> scalingJob = scalingJobService.start(event);
         assertTrue(scalingJob.isPresent());
     }
     
@@ -146,19 +149,6 @@ public final class DistributedScalingJobServiceTest {
         return result;
     }
     
-    private ScalingCallback mockScalingCallback() {
-        return new ScalingCallback() {
-            
-            @Override
-            public void onSuccess(final long jobId) {
-            }
-            
-            @Override
-            public void onFailure(final long jobId) {
-            }
-        };
-    }
-    
     @SneakyThrows(ReflectiveOperationException.class)
     private void resetRegistryRepositoryAvailable() {
         ReflectionUtil.setStaticFieldValue(RegistryRepositoryHolder.class, "available", null);
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java
deleted file mode 100644
index e5e0048..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/spi/ScalingWorkerLoaderTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.core.spi;
-
-import lombok.SneakyThrows;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
-import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
-import org.apache.shardingsphere.scaling.core.config.ScalingContext;
-import org.apache.shardingsphere.scaling.core.config.ServerConfiguration;
-import org.apache.shardingsphere.scaling.core.utils.ReflectionUtil;
-import org.junit.Test;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public final class ScalingWorkerLoaderTest {
-    
-    @Test
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void assertWithoutDistributedScalingService() {
-        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", new ServerConfiguration());
-        assertFalse(ScalingWorkerLoader.initScalingWorker().isPresent());
-    }
-    
-    @Test
-    @SneakyThrows(ReflectiveOperationException.class)
-    public void assertScalingWorkerAvailable() {
-        ReflectionUtil.setFieldValue(ScalingContext.getInstance(), "serverConfig", mockServerConfiguration());
-        assertTrue(ScalingWorkerLoader.initScalingWorker().isPresent());
-    }
-    
-    private ServerConfiguration mockServerConfiguration() {
-        ServerConfiguration result = new ServerConfiguration();
-        result.setDistributedScalingService(new GovernanceConfiguration("test", new GovernanceCenterConfiguration("Zookeeper", "", null), false));
-        return result;
-    }
-}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
deleted file mode 100644
index 1209df9..0000000
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.core.fixture.FixtureScalingWorker
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
index 3abd77f..0abdbf2 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ElasticJobScalingWorker.java
@@ -65,13 +65,8 @@ public final class ElasticJobScalingWorker implements ScalingWorker {
     private CoordinatorRegistryCenter registryCenter;
     
     @Override
-    public String getType() {
-        return "ElasticJob";
-    }
-    
-    @Override
     public void init(final GovernanceConfiguration governanceConfig) {
-        log.info("Scaling elastic job init...");
+        log.info("Init elastic job scaling worker.");
         this.governanceConfig = governanceConfig;
         registryCenter = ElasticJobUtils.createRegistryCenter(governanceConfig);
         watchConfigRepository();
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
similarity index 52%
rename from shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java
rename to shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
index c440d3c..94ad402 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingWorker.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/FinishedCheckWorker.java
@@ -15,19 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.fixture;
+package org.apache.shardingsphere.scaling.elasticjob;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
+import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
 import org.apache.shardingsphere.governance.repository.api.config.GovernanceConfiguration;
 import org.apache.shardingsphere.scaling.core.spi.ScalingWorker;
+import org.apache.shardingsphere.scaling.elasticjob.job.FinishedCheckJob;
+import org.apache.shardingsphere.scaling.elasticjob.util.ElasticJobUtils;
 
-public final class FixtureScalingWorker implements ScalingWorker {
+/**
+ * Finished check worker.
+ */
+@Slf4j
+public final class FinishedCheckWorker implements ScalingWorker {
     
-    @Override
-    public String getType() {
-        return "Fixture";
-    }
+    private static final String JOB_NAME = "finished_check";
+    
+    private static final String CRON_EXPRESSION = "0 * * * * ?";
     
     @Override
     public void init(final GovernanceConfiguration governanceConfig) {
+        log.info("Init finished check worker.");
+        new ScheduleJobBootstrap(ElasticJobUtils.createRegistryCenter(governanceConfig), new FinishedCheckJob(), createJobConfig()).schedule();
+    }
+    
+    private JobConfiguration createJobConfig() {
+        return JobConfiguration.newBuilder(JOB_NAME, 1).cron(CRON_EXPRESSION).build();
     }
 }
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
new file mode 100644
index 0000000..844096a
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/FinishedCheckJob.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.elasticjob.job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.governance.core.event.model.rule.SwitchRuleConfigurationEvent;
+import org.apache.shardingsphere.governance.repository.api.RegistryRepository;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.lock.LockContext;
+import org.apache.shardingsphere.scaling.core.config.WorkflowConfiguration;
+import org.apache.shardingsphere.scaling.core.constant.ScalingConstant;
+import org.apache.shardingsphere.scaling.core.job.ScalingJob;
+import org.apache.shardingsphere.scaling.core.service.RegistryRepositoryHolder;
+import org.apache.shardingsphere.scaling.core.service.impl.DistributedScalingJobService;
+import org.apache.shardingsphere.scaling.core.utils.ScalingTaskUtil;
+import org.apache.shardingsphere.scaling.core.utils.ThreadUtil;
+import org.apache.shardingsphere.scaling.core.workflow.ScalingServiceHolder;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public final class FinishedCheckJob implements SimpleJob {
+    
+    private static final RegistryRepository REGISTRY_REPOSITORY = RegistryRepositoryHolder.getInstance();
+    
+    private final DistributedScalingJobService scalingJobService = new DistributedScalingJobService();
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        List<String> jobs = REGISTRY_REPOSITORY.getChildrenKeys(ScalingConstant.SCALING_LISTENER_PATH);
+        for (String each : jobs) {
+            long jobId = Long.parseLong(each);
+            try {
+                ScalingJob scalingJob = scalingJobService.getJob(jobId);
+                WorkflowConfiguration workflowConfig = scalingJob.getScalingConfig().getJobConfiguration().getWorkflowConfig();
+                if (workflowConfig == null) {
+                    continue;
+                }
+                if (ScalingTaskUtil.allTasksAlmostFinished(scalingJobService.getProgress(jobId), scalingJob.getScalingConfig().getJobConfiguration())) {
+                    log.info("scaling job {} almost finished.", jobId);
+                    trySwitch(jobId, workflowConfig);
+                }
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("scaling job {} finish check failed!", jobId, ex);
+            }
+        }
+    }
+    
+    private void trySwitch(final long jobId, final WorkflowConfiguration workflowConfig) {
+        if (LockContext.getLockStrategy().tryLock(30L, TimeUnit.SECONDS) && LockContext.getLockStrategy().checkLock()) {
+            try {
+                ThreadUtil.sleep(10 * 1000L);
+                if (ScalingServiceHolder.getInstance().checkScalingResult(jobId)) {
+                    ScalingServiceHolder.getInstance().stopScalingJob(jobId);
+                    ShardingSphereEventBus.getInstance().post(new SwitchRuleConfigurationEvent(workflowConfig.getSchemaName(), workflowConfig.getRuleCacheId()));
+                }
+            } finally {
+                LockContext.getLockStrategy().releaseLock();
+            }
+        } else {
+            log.warn("can not get lock.");
+        }
+    }
+}
diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
index 25b2f8f..c4cfd5e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
+++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ScalingWorker
@@ -16,3 +16,4 @@
 #
 
 org.apache.shardingsphere.scaling.elasticjob.ElasticJobScalingWorker
+org.apache.shardingsphere.scaling.elasticjob.FinishedCheckWorker